This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 58f10762 Accord: Serialization Improvements
58f10762 is described below
commit 58f107625a183d77b154221efd2f6f0623214027
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Feb 15 15:13:40 2025 +0000
Accord: Serialization Improvements
- Introduce pre/accept fast execution flags
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20349
---
.../main/java/accord/api/ProtocolModifiers.java | 5 -
.../src/main/java/accord/api/RoutingKey.java | 4 +-
.../accord/coordinate/CoordinateSyncPoint.java | 3 +-
.../accord/coordinate/CoordinateTransaction.java | 38 ++++---
.../accord/coordinate/CoordinationAdapter.java | 19 ++--
.../main/java/accord/coordinate/ExecuteFlag.java | 47 ++++++++
.../src/main/java/accord/coordinate/Propose.java | 4 +-
.../src/main/java/accord/coordinate/Recover.java | 3 +-
.../src/main/java/accord/coordinate/Stabilise.java | 3 +-
.../src/main/java/accord/impl/CommandChange.java | 39 ++++---
.../src/main/java/accord/local/CommandStore.java | 6 +-
.../src/main/java/accord/local/CommandStores.java | 5 +-
.../main/java/accord/local/CommandSummaries.java | 5 +-
.../src/main/java/accord/local/Commands.java | 15 ++-
.../src/main/java/accord/local/DepsCalculator.java | 118 +++++++++++++++++++++
.../main/java/accord/local/cfk/CommandsForKey.java | 10 +-
.../main/java/accord/local/cfk/PostProcess.java | 9 +-
.../src/main/java/accord/messages/Accept.java | 46 +++++---
.../main/java/accord/messages/BeginRecovery.java | 6 +-
.../java/accord/messages/GetEphemeralReadDeps.java | 26 +++--
.../main/java/accord/messages/GetLatestDeps.java | 6 +-
.../src/main/java/accord/messages/PreAccept.java | 69 ++++--------
.../src/main/java/accord/messages/Propagate.java | 2 +-
.../src/main/java/accord/messages/ReadData.java | 14 +++
.../src/main/java/accord/primitives/Range.java | 14 ++-
.../src/main/java/accord/primitives/Routable.java | 1 +
.../main/java/accord/primitives/RoutableKey.java | 8 +-
.../src/main/java/accord/utils/Functions.java | 8 ++
.../src/main/java/accord/utils/TinyEnumSet.java | 10 ++
.../accord/coordinate/CoordinateSyncPointTest.java | 3 +-
.../coordinate/CoordinateTransactionTest.java | 4 +-
.../test/java/accord/impl/PrefixedIntHashKey.java | 6 ++
.../java/accord/impl/basic/InMemoryJournal.java | 2 +-
.../test/java/accord/local/CheckedCommands.java | 2 +-
.../java/accord/local/ImmutableCommandTest.java | 4 +-
.../test/java/accord/messages/PreAcceptTest.java | 9 +-
accord-core/src/test/java/accord/utils/Gen.java | 3 +-
37 files changed, 396 insertions(+), 180 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/ProtocolModifiers.java
b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
index a7affba1..88d41403 100644
--- a/accord-core/src/main/java/accord/api/ProtocolModifiers.java
+++ b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
@@ -217,11 +217,6 @@ public class ProtocolModifiers
public static boolean requiresUniqueHlcs() { return
requiresUniqueHlcs; }
public static void setRequiresUniqueHlcs(boolean
newRequiresUniqueHlcs) { requiresUniqueHlcs = newRequiresUniqueHlcs; }
- // TODO (required): this is a temporary measure to not break migration
- private static boolean temporaryPermitUnsafeBlindWrites = true;
- public static boolean temporaryPermitUnsafeBlindWrites() { return
temporaryPermitUnsafeBlindWrites; }
- public static void setTemporaryPermitUnsafeBlindWrites(boolean
newTemporaryPermitUnsafeBlindWrites) { temporaryPermitUnsafeBlindWrites =
newTemporaryPermitUnsafeBlindWrites; }
-
private static int markStaleIfCannotExecute =
TinyEnumSet.encode(Txn.Kind.Write);
public static boolean markStaleIfCannotExecute(TxnId txnId) { return
TinyEnumSet.contains(markStaleIfCannotExecute, txnId.kindOrdinal()); }
public static boolean markStaleIfCannotExecute(Txn.Kind kind) { return
TinyEnumSet.contains(markStaleIfCannotExecute, kind); }
diff --git a/accord-core/src/main/java/accord/api/RoutingKey.java
b/accord-core/src/main/java/accord/api/RoutingKey.java
index 7bceb52c..d1e46e39 100644
--- a/accord-core/src/main/java/accord/api/RoutingKey.java
+++ b/accord-core/src/main/java/accord/api/RoutingKey.java
@@ -26,9 +26,9 @@ import accord.primitives.Unseekable;
public interface RoutingKey extends Unseekable, RoutableKey
{
@Override default RoutingKey toUnseekable() { return this; }
-
+ @Override default RoutingKey asRoutingKey() { return this; }
@Override default Kind kind() { return Kind.UnseekableKey; }
-
+
Range asRange();
RangeFactory rangeFactory();
}
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index 1dad27d4..36783b60 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import accord.api.Result;
import accord.coordinate.CoordinationAdapter.Adapters;
import accord.coordinate.CoordinationAdapter.Adapters.SyncPointAdapter;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.local.Node;
import accord.messages.Accept;
import accord.messages.Apply;
@@ -177,7 +178,7 @@ public class CoordinateSyncPoint<R> extends
CoordinatePreAccept<R>
withFlags = txnId.addFlag(HLC_BOUND);
Deps deps = Deps.merge(oks.valuesAsNullableList(),
oks.domainSize(), List::get, ok -> ok.deps);
if (tracker.hasFastPathAccepted())
- adapter.execute(node, topologies, route, FAST, txnId, txn,
withFlags, deps, deps, this);
+ adapter.execute(node, topologies, route, FAST,
ExecuteFlags.none(), txnId, txn, withFlags, deps, deps, this);
else if (tracker.hasMediumPathAccepted())
adapter.propose(node, topologies, route, Accept.Kind.MEDIUM,
Ballot.ZERO, txnId, txn, withFlags, deps, this);
else
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index 50c74469..b87538c5 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -26,9 +26,11 @@ import accord.api.Result;
import accord.api.Timeouts;
import accord.api.Timeouts.RegisteredTimeout;
import accord.coordinate.CoordinationAdapter.Adapters;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.Commands;
+import accord.local.DepsCalculator;
import accord.local.KeyHistory;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
@@ -38,7 +40,6 @@ import accord.messages.Accept;
import accord.messages.PreAccept;
import accord.messages.PreAccept.PreAcceptNack;
import accord.messages.PreAccept.PreAcceptReply;
-import accord.primitives.EpochSupplier;
import accord.primitives.Status;
import accord.primitives.Unseekables;
import accord.topology.Topologies;
@@ -50,6 +51,7 @@ import accord.primitives.FullRoute;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.utils.Functions;
import accord.utils.MapReduceConsume;
import accord.utils.SortedListMap;
import accord.utils.async.AsyncResult;
@@ -103,10 +105,11 @@ public class CoordinateTransaction extends
CoordinatePreAccept<Result>
if (tracker.hasFastPathAccepted())
{
Deps deps = Deps.merge(oks.valuesAsNullableList(),
oks.domainSize(), List::get, ok -> ok.deps);
+ ExecuteFlags executeFlags =
Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v :
v.and(ok.flags), ExecuteFlags.all());
// note: we merge all Deps regardless of witnessedAt. While we
only need fast path votes,
// we must include Deps from fast path votes from earlier epochs
that may have witnessed later transactions
// TODO (desired): we might mask some bugs by merging more
responses than we strictly need, so optimise this to optionally merge minimal
deps
- executeAdapter().execute(node, topologies, route, FAST, txnId,
txn, txnId, deps, deps, settingCallback());
+ executeAdapter().execute(node, topologies, route, FAST,
executeFlags, txnId, txn, txnId, deps, deps, settingCallback());
node.agent().metricsEventsListener().onFastPathTaken(txnId, deps);
}
else if (tracker.hasMediumPathAccepted() && txnId.hasMediumPath())
@@ -219,34 +222,43 @@ public class CoordinateTransaction extends
CoordinatePreAccept<Result>
@Override
public PreAcceptReply apply(SafeCommandStore safeStore)
{
- EpochSupplier minEpoch = topologies.size() == 1 ? txnId :
EpochSupplier.constant(topologies.oldestEpoch());
- StoreParticipants participants =
StoreParticipants.update(safeStore, route, minEpoch.epoch(), txnId,
txnId.epoch());
+ long minEpoch = topologies.oldestEpoch();
+ StoreParticipants participants =
StoreParticipants.update(safeStore, route, minEpoch, txnId, txnId.epoch());
SafeCommand safeCommand = safeStore.get(txnId, participants);
+ ExecuteFlags flags;
Deps deps;
if (txnId.is(PrivilegedCoordinatorWithDeps))
{
- deps = PreAccept.calculateDeps(safeStore, txnId, participants,
minEpoch, txnId, true);
- if (deps == null)
- return PreAcceptNack.INSTANCE;
+ try (DepsCalculator calculator = new DepsCalculator())
+ {
+ deps = calculator.calculate(safeStore, txnId,
participants, minEpoch, txnId, true);
+ if (deps == null)
+ return PreAcceptNack.INSTANCE;
+ flags = calculator.executeFlags(txnId);
+ }
- Commands.AcceptOutcome outcome = Commands.preaccept(safeStore,
safeCommand, participants, txnId, txn, deps, true, route);
+ Commands.AcceptOutcome outcome = Commands.preaccept(safeStore,
safeCommand, participants, txnId, txn, deps, true);
if (outcome != Success)
return PreAcceptNack.INSTANCE;
}
else
{
- Commands.AcceptOutcome outcome = Commands.preaccept(safeStore,
safeCommand, participants, txnId, txn, null, true, route);
+ Commands.AcceptOutcome outcome = Commands.preaccept(safeStore,
safeCommand, participants, txnId, txn, null, true);
if (outcome != Success)
return PreAcceptNack.INSTANCE;
- deps = PreAccept.calculateDeps(safeStore, txnId, participants,
minEpoch, txnId, true);
- if (deps == null)
- return PreAcceptNack.INSTANCE;
+ try (DepsCalculator calculator = new DepsCalculator())
+ {
+ deps = calculator.calculate(safeStore, txnId,
participants, minEpoch, txnId, true);
+ if (deps == null)
+ return PreAcceptNack.INSTANCE;
+ flags = calculator.executeFlags(txnId);
+ }
}
Command command = safeCommand.current();
- return new PreAcceptOk(txnId, command.executeAt(), deps);
+ return new PreAcceptOk(txnId, command.executeAt(), deps, flags);
}
@Override
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index f5562e32..dbe637e5 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -24,6 +24,7 @@ import javax.annotation.Nullable;
import accord.api.ProtocolModifiers;
import accord.api.Result;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.coordinate.ExecuteSyncPoint.ExecuteInclusive;
import accord.coordinate.tracking.FastPathTracker;
import accord.coordinate.tracking.PreAcceptExclusiveSyncPointTracker;
@@ -48,8 +49,8 @@ import accord.utils.UnhandledEnum;
import static accord.api.ProtocolModifiers.QuorumEpochIntersections;
import static accord.api.ProtocolModifiers.Toggles.requiresUniqueHlcs;
-import static
accord.api.ProtocolModifiers.Toggles.temporaryPermitUnsafeBlindWrites;
import static accord.coordinate.CoordinationAdapter.Factory.Kind.Recovery;
+import static accord.coordinate.ExecuteFlag.HAS_UNIQUE_HLC;
import static accord.coordinate.ExecutePath.FAST;
import static accord.coordinate.ExecutePath.SLOW;
import static accord.messages.Apply.Kind.Maximal;
@@ -68,7 +69,7 @@ public interface CoordinationAdapter<R>
void proposeOnly(Node node, Route<?> require, Route<?> sendTo,
SelectNodeOwnership selectNodeOwnership, FullRoute<?> route, Accept.Kind kind,
Ballot ballot, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
BiConsumer<? super Deps, Throwable> callback);
void stabilise(Node node, @Nullable Topologies any, FullRoute<?> route,
Ballot ballot, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
BiConsumer<? super R, Throwable> callback);
void stabiliseOnly(Node node, Route<?> require, Route<?> sendTo,
SelectNodeOwnership selectNodeOwnership, FullRoute<?> route, Ballot ballot,
TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super Deps,
Throwable> callback);
- void execute(Node node, @Nullable Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps,
Deps sendDeps, BiConsumer<? super R, Throwable> callback);
+ void execute(Node node, @Nullable Topologies any, FullRoute<?> route,
ExecutePath path, ExecuteFlags executeFlags, TxnId txnId, Txn txn, Timestamp
executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super R, Throwable>
callback);
void persist(Node node, @Nullable Topologies any, Route<?> require,
Route<?> sendTo, SelectNodeOwnership selectNodeOwnership, FullRoute<?> route,
TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result
result, BiConsumer<? super R, Throwable> callback);
default void persist(Node node, @Nullable Topologies any, FullRoute<?>
route, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes,
Result result, BiConsumer<? super R, Throwable> callback)
{
@@ -181,7 +182,7 @@ public interface CoordinationAdapter<R>
route, txnId,
executeAt, SHARE, QuorumEpochIntersections.commit);
Topologies coordinates = all.size() == 1 ? all :
accept.forEpoch(txnId.epoch());
- if (ProtocolModifiers.Faults.txnInstability) execute(node,
all, route, SLOW, txnId, txn, executeAt, deps, deps, callback);
+ if (ProtocolModifiers.Faults.txnInstability) execute(node,
all, route, SLOW, ExecuteFlags.none(), txnId, txn, executeAt, deps, deps,
callback);
else new StabiliseTxn(node, coordinates, all, route, ballot,
txnId, txn, executeAt, deps, callback).start();
}
@@ -205,11 +206,11 @@ public interface CoordinationAdapter<R>
}
@Override
- public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps,
Deps sendDeps, BiConsumer<? super Result, Throwable> callback)
+ public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, ExecuteFlags executeFlags, TxnId txnId, Txn txn, Timestamp
executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super Result,
Throwable> callback)
{
Topologies all = execution(node, any, route, SHARE, route,
txnId, executeAt);
- if ((temporaryPermitUnsafeBlindWrites() ||
!requiresUniqueHlcs()) && txn.read().keys().isEmpty())
+ if ((executeFlags.contains(HAS_UNIQUE_HLC) ||
!requiresUniqueHlcs()) && txn.read().keys().isEmpty())
{
Writes writes = txnId.is(Txn.Kind.Write) ?
txn.execute(txnId, executeAt, null) : null;
Result result = txn.result(txnId, executeAt, null);
@@ -283,7 +284,7 @@ public interface CoordinationAdapter<R>
}
@Override
- public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps,
Deps sendDeps, BiConsumer<? super R, Throwable> callback)
+ public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, ExecuteFlags executeFlags, TxnId txnId, Txn txn, Timestamp
executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super R, Throwable>
callback)
{
persist(node, null, route, txnId, txn, executeAt, stableDeps,
null, txn.result(txnId, executeAt, null), callback);
}
@@ -319,12 +320,12 @@ public interface CoordinationAdapter<R>
}
@Override
- public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps,
Deps sendDeps, BiConsumer<? super R, Throwable> callback)
+ public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, ExecuteFlags executeFlags, TxnId txnId, Txn txn, Timestamp
executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super R, Throwable>
callback)
{
// We cannot use the fast path for sync points as their
visibility is asymmetric wrt other transactions,
// so we could recover to include different transactions than
those we fast path committed with.
Invariants.require(path != FAST);
- super.execute(node, any, route, path, txnId, txn, executeAt,
stableDeps, sendDeps, callback);
+ super.execute(node, any, route, path, executeFlags, txnId,
txn, executeAt, stableDeps, sendDeps, callback);
}
}
@@ -405,7 +406,7 @@ public interface CoordinationAdapter<R>
}
@Override
- public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps,
Deps sendDeps, BiConsumer<? super SyncPoint<U>, Throwable> callback)
+ public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, ExecuteFlags executeFlags, TxnId txnId, Txn txn, Timestamp
executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super SyncPoint<U>,
Throwable> callback)
{
Topologies all = forExecution(node, route, SHARE, txnId,
executeAt, stableDeps);
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
b/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
new file mode 100644
index 00000000..393a4372
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.utils.TinyEnumSet;
+
+public enum ExecuteFlag
+{
+ READY_TO_EXECUTE, HAS_UNIQUE_HLC;
+
+ public static final class ExecuteFlags extends TinyEnumSet<ExecuteFlag>
+ {
+ private static final ExecuteFlags[] LOOKUP = new ExecuteFlags[1 <<
ExecuteFlag.values().length];
+ static
+ {
+ for (int i = 0 ; i < LOOKUP.length ; ++i)
+ LOOKUP[i] = new ExecuteFlags(i);
+ }
+ public static ExecuteFlags none() { return LOOKUP[0]; }
+ public static ExecuteFlags all() { return LOOKUP[LOOKUP.length - 1]; }
+ public static ExecuteFlags get(int bits) { return LOOKUP[bits]; }
+ public static ExecuteFlags get(ExecuteFlag a) { return
LOOKUP[encode(a)]; }
+ public static ExecuteFlags get(ExecuteFlag a, ExecuteFlag b) { return
LOOKUP[encode(a) | encode(b)]; }
+ public ExecuteFlags with(ExecuteFlag a) { return LOOKUP[bitset |
encode(a)]; }
+ public ExecuteFlags or(ExecuteFlags that) { return LOOKUP[this.bitset
| that.bitset]; }
+ public ExecuteFlags and(ExecuteFlags that) { return LOOKUP[this.bitset
& that.bitset]; }
+ public boolean isEmpty() { return bitset == 0; }
+ public int bits() { return bitset; }
+ private ExecuteFlags(int bits) { super(bits); }
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java
b/accord-core/src/main/java/accord/coordinate/Propose.java
index f474ae9a..3ebe52c4 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -23,6 +23,7 @@ import java.util.function.BiConsumer;
import accord.api.ProtocolModifiers.Faults;
import accord.api.RoutingKey;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.coordinate.tracking.QuorumTracker;
import accord.coordinate.tracking.SimpleTracker;
import accord.local.Commands.AcceptOutcome;
@@ -76,6 +77,7 @@ abstract class Propose<R> implements Callback<AcceptReply>
private Throwable failure;
private boolean isDone;
+ private int executeFlags;
Propose(Node node, Topologies topologies, Kind kind, Ballot ballot, TxnId
txnId, Txn txn, Route<?> require, FullRoute<?> route, Timestamp executeAt, Deps
deps, BiConsumer<? super R, Throwable> callback)
{
@@ -194,7 +196,7 @@ abstract class Propose<R> implements Callback<AcceptReply>
// Or we must pick it up as an Unstable dependency here.
Deps newDeps = mergeNewDeps();
Deps stableDeps = mergeDeps(newDeps);
- if (kind == Kind.MEDIUM) adapter().execute(node,
acceptTracker.topologies(), route, MEDIUM, txnId, txn, executeAt, stableDeps,
newDeps, callback);
+ if (kind == Kind.MEDIUM) adapter().execute(node,
acceptTracker.topologies(), route, MEDIUM, ExecuteFlags.none(), txnId, txn,
executeAt, stableDeps, newDeps, callback);
else adapter().stabilise(node, acceptTracker.topologies(), route,
ballot, txnId, txn, executeAt, stableDeps, callback);
if (!Invariants.debug()) acceptOks.clear();
}
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java
b/accord-core/src/main/java/accord/coordinate/Recover.java
index eaee4f66..9abf7058 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import accord.api.ProgressLog.BlockedUntil;
import accord.api.Result;
import accord.api.RoutingKey;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.coordinate.tracking.RecoveryTracker;
import accord.local.Node;
import accord.local.Node.Id;
@@ -284,7 +285,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case Stable:
{
withStableDeps(recoverOkList, executeAt, this, stableDeps
-> {
- adapter.execute(node, tracker.topologies(), route,
RECOVER, txnId, txn, executeAt, stableDeps, stableDeps, this);
+ adapter.execute(node, tracker.topologies(), route,
RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps,
this);
});
return;
}
diff --git a/accord-core/src/main/java/accord/coordinate/Stabilise.java
b/accord-core/src/main/java/accord/coordinate/Stabilise.java
index a7cb15e3..3fa3fa99 100644
--- a/accord-core/src/main/java/accord/coordinate/Stabilise.java
+++ b/accord-core/src/main/java/accord/coordinate/Stabilise.java
@@ -21,6 +21,7 @@ package accord.coordinate;
import java.util.Map;
import java.util.function.BiConsumer;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.coordinate.tracking.QuorumTracker;
import accord.coordinate.tracking.RequestStatus;
import accord.local.Node;
@@ -157,7 +158,7 @@ public abstract class Stabilise<R> implements
Callback<ReadReply>
protected void onStabilised()
{
- adapter().execute(node, allTopologies, route, ballot == Ballot.ZERO ?
SLOW : RECOVER, txnId, txn, executeAt, stabiliseDeps, stabiliseDeps, callback);
+ adapter().execute(node, allTopologies, route, ballot == Ballot.ZERO ?
SLOW : RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stabiliseDeps,
stabiliseDeps, callback);
}
protected abstract CoordinationAdapter<R> adapter();
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java
b/accord-core/src/main/java/accord/impl/CommandChange.java
index 10306182..e38ff635 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -152,7 +152,7 @@ public class CommandChange
protected TxnId txnId;
protected Timestamp executeAt;
- protected Timestamp executeAtLeast;
+ protected Timestamp executesAtLeast;
protected long minUniqueHlc;
protected SaveStatus saveStatus;
protected Status.Durability durability;
@@ -164,7 +164,6 @@ public class CommandChange
protected PartialTxn partialTxn;
protected PartialDeps partialDeps;
- protected byte[] waitingOnBytes;
protected CommandChange.WaitingOnProvider waitingOn;
protected Writes writes;
protected Result result;
@@ -215,7 +214,7 @@ public class CommandChange
txnId = null;
executeAt = null;
- executeAtLeast = null;
+ executesAtLeast = null;
minUniqueHlc = 0;
saveStatus = null;
durability = null;
@@ -227,7 +226,6 @@ public class CommandChange
partialTxn = null;
partialDeps = null;
- waitingOnBytes = null;
waitingOn = null;
writes = null;
result = null;
@@ -246,10 +244,10 @@ public class CommandChange
public void init(TxnId txnId)
{
this.txnId = txnId;
- durability = NotDurable;
- acceptedOrCommitted = promised = Ballot.ZERO;
- waitingOn = (txn, deps, executeAtLeast, uniqueHlc) -> null;
- result = null;
+ this.durability = NotDurable;
+ this.acceptedOrCommitted = promised = Ballot.ZERO;
+ this.waitingOn = (txn, deps, executeAtLeast, uniqueHlc) -> null;
+ this.result = null;
}
public boolean isEmpty()
@@ -292,7 +290,7 @@ public class CommandChange
return false;
SaveStatus newSaveStatus = cleanup.appliesIfNot;
- truncate(saveStatusMasks[newSaveStatus.ordinal()]);
+ setNulls(saveStatusMasks[newSaveStatus.ordinal()]);
if (input == Input.FULL)
{
if (newSaveStatus == SaveStatus.TruncatedApply &&
!saveStatus.known.is(ApplyAtKnown))
@@ -302,9 +300,11 @@ public class CommandChange
return true;
}
- protected void truncate(int mask)
+ protected void setNulls(int mask)
{
- // low flag bits represent fields already nulled out, so no need
to visit them again
+ // limit ourselves to those fields that have been changed to null
+ mask &= 0xffff | (mask << 16);
+ // low bits of flags represent fields already nulled out, so no
need to visit them again
int iterable = toIterableSetFields(mask) & ~flags;
for (Field next = nextSetField(iterable); next != null; iterable =
unsetIterable(next, iterable), next = nextSetField(iterable))
{
@@ -315,12 +315,12 @@ public class CommandChange
case SAVE_STATUS: saveStatus = null;
break;
case PARTIAL_DEPS: partialDeps = null;
break;
case EXECUTE_AT: executeAt = null;
break;
- case EXECUTES_AT_LEAST: executeAtLeast = null;
break;
+ case EXECUTES_AT_LEAST: executesAtLeast = null;
break;
case MIN_UNIQUE_HLC: minUniqueHlc = 0;
break;
case DURABILITY: durability = null;
break;
case ACCEPTED: acceptedOrCommitted = null;
break;
case PROMISED: promised = null;
break;
- case WAITING_ON: waitingOnBytes = null; waitingOn =
null; break;
+ case WAITING_ON: waitingOn = null;
break;
case PARTIAL_TXN: partialTxn = null;
break;
case WRITES: writes = null;
break;
case CLEANUP: cleanup = null;
break;
@@ -354,7 +354,7 @@ public class CommandChange
WaitingOn waitingOn = null;
if (this.waitingOn != null)
- waitingOn = this.waitingOn.provide(txnId, partialDeps,
executeAtLeast, minUniqueHlc);
+ waitingOn = this.waitingOn.provide(txnId, partialDeps,
executesAtLeast, minUniqueHlc);
switch (saveStatus.status)
{
@@ -378,7 +378,7 @@ public class CommandChange
return executed(txnId, saveStatus, durability,
participants, promised, executeAt, partialTxn, partialDeps,
acceptedOrCommitted, waitingOn, writes, result);
case Truncated:
case Invalidated:
- return truncated(txnId, saveStatus, durability,
participants, executeAt, executeAtLeast, writes, result);
+ return truncated(txnId, saveStatus, durability,
participants, executeAt, executesAtLeast, writes, result);
default:
throw new UnhandledEnum(saveStatus.status);
}
@@ -408,7 +408,7 @@ public class CommandChange
return "Builder {" +
"txnId=" + txnId +
", executeAt=" + executeAt +
- ", executeAtLeast=" + executeAtLeast +
+ ", executeAtLeast=" + executesAtLeast +
", uniqueHlc=" + minUniqueHlc +
", saveStatus=" + saveStatus +
", durability=" + durability +
@@ -562,6 +562,11 @@ public class CommandChange
return flags >>> 16;
}
+ public static int toIterableNonNullFields(int flags)
+ {
+ return toIterableSetFields(flags) & ~flags;
+ }
+
public static Field nextSetField(int iterable)
{
int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable));
@@ -577,7 +582,7 @@ public class CommandChange
{
int iterable = toIterableSetFields(flags);
StringBuilder builder = new StringBuilder("[");
- for (Field next = nextSetField(iterable) ; next != null; iterable =
unsetIterable(next, iterable), next = nextSetField(iterable))
+ for (Field next = nextSetField(iterable) ; next != null; next =
nextSetField(iterable = unsetIterable(next, iterable)))
{
if (builder.length() > 1)
builder.append(',');
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index b6116af7..a459d430 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -79,10 +79,10 @@ public abstract class CommandStore implements AgentExecutor
{
private static final Logger logger =
LoggerFactory.getLogger(CommandStore.class);
- static class EpochUpdate
+ public static class EpochUpdate
{
- final RangesForEpoch newRangesForEpoch;
- final RedundantBefore addRedundantBefore;
+ public final RangesForEpoch newRangesForEpoch;
+ public final RedundantBefore addRedundantBefore;
EpochUpdate(RangesForEpoch newRangesForEpoch, RedundantBefore
addRedundantBefore)
{
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java
b/accord-core/src/main/java/accord/local/CommandStores.java
index 9662c096..5e110d3d 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -514,9 +514,10 @@ public abstract class CommandStores
for (Ranges addRanges : shardDistributor.split(added))
{
EpochUpdateHolder updateHolder = new EpochUpdateHolder();
+ RangesForEpoch rangesForEpoch = new RangesForEpoch(epoch,
addRanges);
+ updateHolder.add(epoch, rangesForEpoch, addRanges);
ShardHolder shard = new ShardHolder(supplier.create(nextId++,
updateHolder));
- shard.ranges = new RangesForEpoch(epoch, addRanges);
- shard.store.epochUpdateHolder.add(epoch, shard.ranges,
addRanges);
+ shard.ranges = rangesForEpoch;
Map<Boolean, Ranges> partitioned =
addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global,
newLocalTopology, range));
if (partitioned.containsKey(false))
diff --git a/accord-core/src/main/java/accord/local/CommandSummaries.java
b/accord-core/src/main/java/accord/local/CommandSummaries.java
index b04b1804..40478479 100644
--- a/accord-core/src/main/java/accord/local/CommandSummaries.java
+++ b/accord-core/src/main/java/accord/local/CommandSummaries.java
@@ -246,7 +246,8 @@ public interface CommandSummaries
interface ActiveCommandVisitor<P1, P2>
{
- void visit(P1 p1, P2 p2, Unseekable keyOrRange, TxnId txnId);
+ void visit(P1 p1, P2 p2, SummaryStatus status, Unseekable keyOrRange,
TxnId txnId);
+ default void visitMaxAppliedHlc(long maxAppliedHlc) {}
}
interface AllCommandVisitor
@@ -328,7 +329,7 @@ public interface CommandSummaries
continue;
for (Unseekable keyOrRange :
value.participants.intersecting(keysOrRanges, Minimal))
- visit.visit(p1, p2, keyOrRange, txnId);
+ visit.visit(p1, p2, status, keyOrRange, txnId);
}
}
}
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index af8bc16f..e50c29d2 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -39,7 +39,6 @@ import accord.messages.Commit;
import accord.primitives.AbstractUnseekableKeys;
import accord.primitives.Ballot;
import accord.primitives.Deps;
-import accord.primitives.FullRoute;
import accord.primitives.Known;
import accord.primitives.Known.KnownExecuteAt;
import accord.primitives.PartialDeps;
@@ -132,7 +131,7 @@ public class Commands
public enum AcceptOutcome { Success, Redundant, RejectedBallot,
Insufficient, Retired, Truncated }
- public static AcceptOutcome preaccept(SafeCommandStore safeStore,
SafeCommand safeCommand, StoreParticipants participants, TxnId txnId, Txn
partialTxn, @Nullable Deps partialDeps, boolean hasCoordinatorVote,
FullRoute<?> route)
+ public static AcceptOutcome preaccept(SafeCommandStore safeStore,
SafeCommand safeCommand, StoreParticipants participants, TxnId txnId, Txn
partialTxn, @Nullable Deps partialDeps, boolean hasCoordinatorVote)
{
Invariants.require(partialDeps == null ||
txnId.is(PrivilegedCoordinatorWithDeps));
Invariants.require(!hasCoordinatorVote ||
txnId.hasPrivilegedCoordinator());
@@ -141,16 +140,16 @@ public class Commands
else if (hasCoordinatorVote) newSaveStatus = PreAcceptedWithVote;
else newSaveStatus = PreAccepted;
- return preacceptOrRecover(safeStore, safeCommand, participants,
newSaveStatus, txnId, partialTxn, partialDeps, route, Ballot.ZERO);
+ return preacceptOrRecover(safeStore, safeCommand, participants,
newSaveStatus, txnId, partialTxn, partialDeps, Ballot.ZERO);
}
- public static AcceptOutcome recover(SafeCommandStore safeStore,
SafeCommand safeCommand, StoreParticipants participants, TxnId txnId,
PartialTxn partialTxn, FullRoute<?> route, Ballot ballot)
+ public static AcceptOutcome recover(SafeCommandStore safeStore,
SafeCommand safeCommand, StoreParticipants participants, TxnId txnId,
PartialTxn partialTxn, Ballot ballot)
{
// for recovery we only ever propose either the original epoch or an
Accept that we witness; otherwise we invalidate
- return preacceptOrRecover(safeStore, safeCommand, participants,
SaveStatus.PreAccepted, txnId, partialTxn, null, route, ballot);
+ return preacceptOrRecover(safeStore, safeCommand, participants,
SaveStatus.PreAccepted, txnId, partialTxn, null, ballot);
}
- private static AcceptOutcome preacceptOrRecover(SafeCommandStore
safeStore, SafeCommand safeCommand, StoreParticipants participants, SaveStatus
newSaveStatus, TxnId txnId, Txn txn, @Nullable Deps deps, FullRoute<?> route,
Ballot ballot)
+ private static AcceptOutcome preacceptOrRecover(SafeCommandStore
safeStore, SafeCommand safeCommand, StoreParticipants participants, SaveStatus
newSaveStatus, TxnId txnId, Txn txn, @Nullable Deps deps, Ballot ballot)
{
final Command command = safeCommand.current();
if (command.hasBeen(Truncated))
@@ -180,7 +179,7 @@ public class Commands
if (command.known().deps().hasProposedOrDecidedDeps()) participants =
command.participants().supplement(participants);
else participants = participants.filter(UPDATE, safeStore, txnId,
null);
- Validated validated = validate(ballot, newSaveStatus, command,
participants, route, txn, deps);
+ Validated validated = validate(ballot, newSaveStatus, command,
participants, participants.route(), txn, deps);
Invariants.require(validated != INSUFFICIENT);
if (command.executeAt() == null)
@@ -191,7 +190,7 @@ public class Commands
// if we are performing recovery (i.e. non-zero ballot), do not
permit a fast path decision as we want to
// invalidate any transactions that were not completed by their
initial coordinator
// TODO (desired): limit preaccept to keys we include, to avoid
inflating unnecessary state
- Timestamp executeAt = safeStore.commandStore().preaccept(txnId,
route, safeStore, ballot.equals(Ballot.ZERO));
+ Timestamp executeAt = safeStore.commandStore().preaccept(txnId,
participants.route(), safeStore, ballot.equals(Ballot.ZERO));
if (txnId != executeAt || !command.is(NotDefined))
{
newSaveStatus = PreAccepted;
diff --git a/accord-core/src/main/java/accord/local/DepsCalculator.java
b/accord-core/src/main/java/accord/local/DepsCalculator.java
new file mode 100644
index 00000000..e31232d4
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/DepsCalculator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
+import accord.local.CommandSummaries.SummaryStatus;
+import accord.primitives.Deps;
+import accord.primitives.EpochSupplier;
+import accord.primitives.Participants;
+import accord.primitives.RangeDeps;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+
+import static accord.coordinate.ExecuteFlag.HAS_UNIQUE_HLC;
+import static accord.coordinate.ExecuteFlag.READY_TO_EXECUTE;
+import static accord.local.CommandSummaries.SummaryStatus.APPLIED;
+import static accord.primitives.Txn.Kind.EphemeralRead;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+
+public class DepsCalculator extends Deps.Builder implements
CommandSummaries.ActiveCommandVisitor<TxnId, Object>
+{
+ private boolean hasUnappliedDependency;
+ private long maxAppliedHlc;
+
+ public DepsCalculator()
+ {
+ super(true);
+ }
+
+ @Override
+ public void visit(TxnId self, Object o, SummaryStatus status, Unseekable
keyOrRange, TxnId txnId)
+ {
+ if (self == null || !self.equals(txnId))
+ add(keyOrRange, txnId);
+ if (status.compareTo(APPLIED) < 0)
+ hasUnappliedDependency = true;
+ }
+
+ @Override
+ public void visitMaxAppliedHlc(long maxAppliedHlc)
+ {
+ if (maxAppliedHlc > this.maxAppliedHlc)
+ this.maxAppliedHlc = maxAppliedHlc;
+ }
+
+ public ExecuteFlags executeFlags(TxnId txnId)
+ {
+ ExecuteFlags flags = ExecuteFlags.none();
+ if (!hasUnappliedDependency)
+ flags = flags.with(READY_TO_EXECUTE);
+ if (maxAppliedHlc < txnId.hlc())
+ flags = flags.with(HAS_UNIQUE_HLC);
+ return flags;
+ }
+
+ public Deps calculate(SafeCommandStore safeStore, TxnId txnId,
StoreParticipants participants, long minEpoch, Timestamp executeAt, boolean
nullIfRedundant)
+ {
+ return calculate(safeStore, txnId, participants.touches(), minEpoch,
executeAt, nullIfRedundant);
+ }
+
+ public Deps calculate(SafeCommandStore safeStore, TxnId txnId,
Participants<?> touches, long minEpoch, Timestamp executeAt, boolean
nullIfRedundant)
+ {
+ RangeDeps redundant;
+ try (RangeDeps.BuilderByRange redundantBuilder =
RangeDeps.builderByRange())
+ {
+ redundant = safeStore.redundantBefore().collectDeps(touches,
redundantBuilder, EpochSupplier.constant(minEpoch), executeAt)
+ .build();
+ }
+
+ if (nullIfRedundant && !txnId.is(EphemeralRead))
+ {
+ TxnId maxRedundantBefore = redundant.maxTxnId(null);
+ if (maxRedundantBefore != null &&
maxRedundantBefore.compareTo(executeAt) >= 0)
+ {
+ Invariants.require(maxRedundantBefore.is(ExclusiveSyncPoint));
+ return null;
+ }
+ }
+
+ // NOTE: ExclusiveSyncPoint *relies* on STARTED_BEFORE to ensure it
reports a dependency on *every* earlier TxnId that may execute (before or after
it).
+ safeStore.visit(touches, executeAt, txnId.witnesses(), this,
executeAt.equals(txnId) ? null : txnId, null);
+ Deps result = super.build();
+ result = new Deps(result.keyDeps, result.rangeDeps.with(redundant),
result.directKeyDeps);
+ Invariants.require(!result.contains(txnId));
+ return result;
+ }
+
+ public static Deps calculateDeps(SafeCommandStore safeStore, TxnId txnId,
StoreParticipants participants, long minEpoch, Timestamp executeAt, boolean
nullIfRedundant)
+ {
+ return calculateDeps(safeStore, txnId, participants.touches(),
minEpoch, executeAt, nullIfRedundant);
+ }
+
+ public static Deps calculateDeps(SafeCommandStore safeStore, TxnId txnId,
Participants<?> touches, long minEpoch, Timestamp executeAt, boolean
nullIfRedundant)
+ {
+ try (DepsCalculator calculator = new DepsCalculator())
+ {
+ return calculator.calculate(safeStore, txnId, touches, minEpoch,
executeAt, nullIfRedundant);
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index 4a844575..4487f4d7 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -70,6 +70,7 @@ import static
accord.local.CommandSummaries.IsDep.IS_NOT_COORD_DEP;
import static accord.local.CommandSummaries.IsDep.IS_NOT_STABLE_DEP;
import static accord.local.CommandSummaries.IsDep.NOT_ELIGIBLE;
import static accord.local.CommandSummaries.SummaryStatus.APPLIED;
+import static
accord.local.CommandSummaries.SummaryStatus.NOT_DIRECTLY_WITNESSED;
import static accord.local.CommandSummaries.SummaryStatus.PREACCEPTED;
import static accord.local.cfk.CommandsForKey.InternalStatus.ACCEPTED;
import static accord.local.cfk.CommandsForKey.InternalStatus.APPLIED_DURABLE;
@@ -1343,6 +1344,8 @@ public class CommandsForKey extends CommandsForKeyUpdate
ActiveCommandVisitor<P1, P2> visitor,
P1 p1, P2 p2)
{
+ visitor.visitMaxAppliedHlc(maxUniqueHlc);
+
TxnId prunedBefore = prunedBefore();
int end = insertPos(startedBefore);
// We only filter durable transactions less than BOTH the txnId and
executeAt of our max preceding write.
@@ -1396,7 +1399,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
while (nextPruned != null && nextPruned.compareTo(txn) < 0)
{
if (nextPruned.isVisible && nextPruned.is(testKind))
- visitor.visit(p1, p2, key, nextPruned.plainTxnId());
+ visitor.visit(p1, p2, NOT_DIRECTLY_WITNESSED, key,
nextPruned.plainTxnId());
if (loadingPruned.hasNext()) nextPruned = loadingPruned.next();
else nextPruned = null;
@@ -1464,8 +1467,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
}
}
-
- visitor.visit(p1, p2, key, txn.plainTxnId());
+ visitor.visit(p1, p2, txn.summaryStatus(), key, txn.plainTxnId());
}
if (startedBefore.compareTo(prunedBefore) <= 0)
@@ -1483,7 +1485,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
if (txn.is(Write))
{
- visitor.visit(p1, p2, key,
committedByExecuteAt[i].plainTxnId());
+ visitor.visit(p1, p2, txn.summaryStatus(), key,
committedByExecuteAt[i].plainTxnId());
if (txn.compareTo(startedBefore) > 0)
break;
}
diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java
b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
index b938746b..c450a2c0 100644
--- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java
+++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
@@ -27,7 +27,6 @@ import javax.annotation.Nullable;
import accord.api.RoutingKey;
import accord.local.Command;
-import accord.local.PreLoadContext;
import accord.local.RedundantBefore;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
@@ -46,6 +45,7 @@ import accord.utils.btree.BTree;
import static accord.local.CommandSummaries.SummaryStatus.APPLIED;
import static accord.local.KeyHistory.SYNC;
+import static accord.local.PreLoadContext.contextFor;
import static accord.local.cfk.CommandsForKey.InternalStatus.INVALIDATED;
import static accord.local.cfk.CommandsForKey.InternalStatus.STABLE;
import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY;
@@ -98,10 +98,9 @@ abstract class PostProcess
safeStore = safeStore; // make it unsafe for use in lambda
SafeCommand safeCommand =
safeStore.ifLoadedAndInitialised(txnId);
if (safeCommand != null) load(safeStore, safeCommand, safeCfk,
notifySink);
- else
-
safeStore.commandStore().execute(PreLoadContext.contextFor(txnId,
RoutingKeys.of(key), SYNC), safeStore0 -> {
- load(safeStore0, safeStore0.unsafeGet(txnId),
safeStore0.get(key), notifySink);
- }, safeStore.agent());
+ else safeStore.commandStore().execute(contextFor(txnId,
RoutingKeys.of(key), SYNC), safeStore0 -> {
+ load(safeStore0, safeStore0.unsafeGet(txnId),
safeStore0.get(key), notifySink);
+ }, safeStore.agent());
}
}
diff --git a/accord-core/src/main/java/accord/messages/Accept.java
b/accord-core/src/main/java/accord/messages/Accept.java
index 35968ec7..b1a58f79 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -21,9 +21,11 @@ package accord.messages;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.local.Command;
import accord.local.Commands;
import accord.local.Commands.AcceptOutcome;
+import accord.local.DepsCalculator;
import accord.local.KeyHistory;
import accord.local.Node.Id;
import accord.local.SafeCommand;
@@ -31,7 +33,6 @@ import accord.local.SafeCommandStore;
import accord.local.StoreParticipants;
import accord.primitives.Ballot;
import accord.primitives.Deps;
-import accord.primitives.EpochSupplier;
import accord.primitives.FullRoute;
import accord.primitives.PartialDeps;
import accord.primitives.Participants;
@@ -136,7 +137,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
if (!calculate.isEmpty())
{
- Deps calculatedDeps = calculateDeps(safeStore,
calculate);
+ Deps calculatedDeps =
DepsCalculator.calculateDeps(safeStore, txnId, calculate, minEpoch, executeAt,
true);
if (calculatedDeps == null)
return AcceptReply.inThePast(ballot, participants,
command);
@@ -158,29 +159,25 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
// if we're Retired, participants.owns() is empty, so we're
just fetching deps
// TODO (desired): optimise deps calculation; for some keys we
only need to return the last RX
case Success:
- Deps deps = calculateDeps(safeStore, participants);
- if (deps == null)
- return AcceptReply.inThePast(ballot, participants,
safeCommand.current());
+ ExecuteFlags flags;
+ Deps deps;
+ try (DepsCalculator calculator = new DepsCalculator())
+ {
+ deps = calculator.calculate(safeStore, txnId,
participants, minEpoch, executeAt, true);
+ if (deps == null)
+ return AcceptReply.inThePast(ballot, participants,
safeCommand.current());
+ flags = calculator.executeFlags(txnId);
+ }
Invariants.require(deps.maxTxnId(txnId).epoch() <=
executeAt.epoch());
if (filterDuplicateDependenciesFromAcceptReply())
deps = deps.without(this.partialDeps);
Participants<?> successful = isPartialAccept ?
participants.touches() : null;
- return new AcceptReply(successful, deps);
+ return new AcceptReply(successful, deps, flags);
}
}
- private Deps calculateDeps(SafeCommandStore safeStore, StoreParticipants
participants)
- {
- return PreAccept.calculateDeps(safeStore, txnId, participants,
EpochSupplier.constant(minEpoch), executeAt, true);
- }
-
- private Deps calculateDeps(SafeCommandStore safeStore, Participants<?>
participants)
- {
- return PreAccept.calculateDeps(safeStore, txnId, participants,
EpochSupplier.constant(minEpoch), executeAt, true);
- }
-
@Override
public AcceptReply reduce(AcceptReply r1, AcceptReply r2)
{
@@ -217,6 +214,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
public static final class AcceptReply implements Reply
{
+ public enum Flag { READY_TO_EXECUTE, HAS_UNIQUE_HLC }
public static final AcceptReply SUCCESS = new AcceptReply(Success);
public final AcceptOutcome outcome;
@@ -224,6 +222,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
public final @Nullable Participants<?> successful;
public final @Nullable Deps deps;
public final @Nullable Timestamp committedExecuteAt;
+ public final ExecuteFlags flags;
private AcceptReply(AcceptOutcome outcome)
{
@@ -232,6 +231,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
this.successful = null;
this.deps = null;
this.committedExecuteAt = null;
+ this.flags = ExecuteFlags.none();
}
public AcceptReply(Ballot supersededBy)
@@ -241,15 +241,22 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
this.successful = null;
this.deps = null;
this.committedExecuteAt = null;
+ this.flags = ExecuteFlags.none();
}
public AcceptReply(@Nullable Participants<?> successful, @Nonnull Deps
deps)
+ {
+ this(successful, deps, ExecuteFlags.none());
+ }
+
+ public AcceptReply(@Nullable Participants<?> successful, @Nonnull Deps
deps, ExecuteFlags flags)
{
this.outcome = Success;
this.supersededBy = null;
this.successful = successful;
this.deps = deps;
this.committedExecuteAt = null;
+ this.flags = flags;
}
public AcceptReply(AcceptOutcome outcome, Ballot supersededBy,
@Nullable Timestamp committedExecuteAt)
@@ -259,15 +266,22 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
this.successful = null;
this.deps = null;
this.committedExecuteAt = committedExecuteAt;
+ this.flags = ExecuteFlags.none();
}
public AcceptReply(AcceptOutcome outcome, Ballot supersededBy,
@Nullable Participants<?> successful, @Nullable Deps deps, @Nullable Timestamp
committedExecuteAt)
+ {
+ this(outcome, supersededBy, successful, deps, committedExecuteAt,
ExecuteFlags.none());
+ }
+
+ public AcceptReply(AcceptOutcome outcome, Ballot supersededBy,
@Nullable Participants<?> successful, @Nullable Deps deps, @Nullable Timestamp
committedExecuteAt, ExecuteFlags flags)
{
this.outcome = outcome;
this.supersededBy = supersededBy;
this.successful = successful;
this.deps = deps;
this.committedExecuteAt = committedExecuteAt;
+ this.flags = flags;
}
static AcceptReply redundant(AcceptOutcome outcome, Ballot ballot,
Command command)
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index d4cd79f2..64900ba5 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -54,8 +54,6 @@ import static
accord.messages.BeginRecovery.RecoverReply.Kind.Ok;
import static accord.messages.BeginRecovery.RecoverReply.Kind.Reject;
import static accord.messages.BeginRecovery.RecoverReply.Kind.Retired;
import static accord.messages.BeginRecovery.RecoverReply.Kind.Truncated;
-import static accord.messages.PreAccept.calculateDeps;
-import static accord.primitives.EpochSupplier.constant;
import static accord.primitives.Known.KnownDeps.DepsUnknown;
import static accord.primitives.Status.AcceptedMedium;
import static accord.primitives.Status.Phase;
@@ -111,7 +109,7 @@ public class BeginRecovery extends
TxnRequest.WithUnsynced<BeginRecovery.Recover
{
StoreParticipants participants = StoreParticipants.update(safeStore,
route, minEpoch, txnId, executeAtOrTxnIdEpoch);
SafeCommand safeCommand = safeStore.get(txnId, participants);
- Commands.AcceptOutcome outcome = Commands.recover(safeStore,
safeCommand, participants, txnId, partialTxn, route, ballot);
+ Commands.AcceptOutcome outcome = Commands.recover(safeStore,
safeCommand, participants, txnId, partialTxn, ballot);
switch (outcome)
{
default: throw UnhandledEnum.unknown(outcome);
@@ -129,7 +127,7 @@ public class BeginRecovery extends
TxnRequest.WithUnsynced<BeginRecovery.Recover
Deps localDeps = null;
if (!command.known().deps().hasCommittedOrDecidedDeps())
{
- localDeps = calculateDeps(safeStore, txnId, participants,
constant(minEpoch), txnId, false);
+ localDeps = DepsCalculator.calculateDeps(safeStore, txnId,
participants, minEpoch, txnId, false);
}
if (localDeps != null && coordinatedDeps != null &&
!participants.touches().equals(coordinatedDeps.covering))
{
diff --git
a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
index 755708ff..dbd3c5bf 100644
--- a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
@@ -20,6 +20,8 @@ package accord.messages;
import javax.annotation.Nonnull;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
+import accord.local.DepsCalculator;
import accord.local.KeyHistory;
import accord.local.Node.Id;
import accord.local.SafeCommandStore;
@@ -33,8 +35,7 @@ import accord.topology.Topologies;
import accord.utils.Invariants;
import accord.utils.async.Cancellable;
-import static accord.messages.PreAccept.calculateDeps;
-import static accord.primitives.EpochSupplier.constant;
+import static accord.local.DepsCalculator.calculateDeps;
public class GetEphemeralReadDeps extends
TxnRequest.WithUnsynced<GetEphemeralReadDeps.GetEphemeralReadDepsOk>
{
@@ -70,15 +71,20 @@ public class GetEphemeralReadDeps extends
TxnRequest.WithUnsynced<GetEphemeralRe
public GetEphemeralReadDepsOk apply(SafeCommandStore safeStore)
{
StoreParticipants participants = StoreParticipants.read(safeStore,
scope, txnId, minEpoch, Long.MAX_VALUE);
- Deps deps = calculateDeps(safeStore, txnId, participants,
constant(minEpoch), Timestamp.MAX, false);
-
- return new GetEphemeralReadDepsOk(deps,
Math.max(safeStore.node().epoch(), node.epoch()));
+ Deps deps;
+ ExecuteFlags flags;
+ try (DepsCalculator calculator = new DepsCalculator())
+ {
+ deps = calculateDeps(safeStore, txnId, participants, minEpoch,
Timestamp.MAX, false);
+ flags = calculator.executeFlags(txnId);
+ }
+ return new GetEphemeralReadDepsOk(deps,
Math.max(safeStore.node().epoch(), node.epoch()), flags);
}
@Override
- public GetEphemeralReadDepsOk reduce(GetEphemeralReadDepsOk reply1,
GetEphemeralReadDepsOk reply2)
+ public GetEphemeralReadDepsOk reduce(GetEphemeralReadDepsOk r1,
GetEphemeralReadDepsOk r2)
{
- return new GetEphemeralReadDepsOk(reply1.deps.with(reply2.deps),
Math.max(reply1.latestEpoch, reply2.latestEpoch));
+ return new GetEphemeralReadDepsOk(r1.deps.with(r2.deps),
Math.max(r1.latestEpoch, r2.latestEpoch), r1.flags.and(r2.flags));
}
@Override
@@ -104,13 +110,17 @@ public class GetEphemeralReadDeps extends
TxnRequest.WithUnsynced<GetEphemeralRe
public static class GetEphemeralReadDepsOk implements Reply
{
+ public enum Flag { READY_TO_EXECUTE }
+
public final Deps deps;
public final long latestEpoch;
+ public final ExecuteFlags flags;
- public GetEphemeralReadDepsOk(@Nonnull Deps deps, long latestEpoch)
+ public GetEphemeralReadDepsOk(@Nonnull Deps deps, long latestEpoch,
ExecuteFlags flags)
{
this.deps = Invariants.nonNull(deps);
this.latestEpoch = latestEpoch;
+ this.flags = flags;
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/GetLatestDeps.java
b/accord-core/src/main/java/accord/messages/GetLatestDeps.java
index b8d7b2d4..c4382bc2 100644
--- a/accord-core/src/main/java/accord/messages/GetLatestDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetLatestDeps.java
@@ -22,6 +22,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.local.Command;
+import accord.local.DepsCalculator;
import accord.local.KeyHistory;
import accord.local.Node.Id;
import accord.local.SafeCommand;
@@ -39,9 +40,6 @@ import accord.topology.Topologies;
import accord.utils.Invariants;
import accord.utils.async.Cancellable;
-import static accord.messages.PreAccept.calculateDeps;
-import static accord.primitives.EpochSupplier.constant;
-
public class GetLatestDeps extends
TxnRequest.WithUnsynced<GetLatestDeps.GetLatestDepsReply>
{
public static final class SerializationSupport
@@ -91,7 +89,7 @@ public class GetLatestDeps extends
TxnRequest.WithUnsynced<GetLatestDeps.GetLate
Deps localDeps = null;
if (!command.known().deps().hasCommittedOrDecidedDeps() &&
!command.hasBeen(Status.Truncated))
{
- localDeps = calculateDeps(safeStore, txnId, participants,
constant(minEpoch), executeAt, false);
+ localDeps = DepsCalculator.calculateDeps(safeStore, txnId,
participants, minEpoch, txnId, false);
}
LatestDeps deps = LatestDeps.create(participants.owns(),
command.known().deps(), command.acceptedOrCommitted(), coordinatedDeps,
localDeps);
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java
b/accord-core/src/main/java/accord/messages/PreAccept.java
index cc948373..7a884c93 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -25,19 +25,18 @@ import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.local.Command;
import accord.local.Commands;
+import accord.local.DepsCalculator;
import accord.local.KeyHistory;
import accord.local.Node.Id;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.primitives.PartialDeps;
-import accord.primitives.Participants;
-import accord.primitives.RangeDeps;
import accord.local.StoreParticipants;
import accord.messages.TxnRequest.WithUnsynced;
import accord.primitives.Deps;
-import accord.primitives.EpochSupplier;
import accord.primitives.FullRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Route;
@@ -52,8 +51,6 @@ import accord.utils.UnhandledEnum;
import accord.utils.async.Cancellable;
import static accord.primitives.Timestamp.Flag.REJECTED;
-import static accord.primitives.Txn.Kind.EphemeralRead;
-import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply>
{
@@ -114,7 +111,7 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
{
StoreParticipants participants = StoreParticipants.update(safeStore,
route, minEpoch, txnId, acceptEpoch);
SafeCommand safeCommand = safeStore.get(txnId, participants);
- Commands.AcceptOutcome outcome = Commands.preaccept(safeStore,
safeCommand, participants, txnId, partialTxn, partialDeps, hasCoordinatorVote,
route);
+ Commands.AcceptOutcome outcome = Commands.preaccept(safeStore,
safeCommand, participants, txnId, partialTxn, partialDeps, hasCoordinatorVote);
Command command = safeCommand.current();
switch (outcome)
{
@@ -127,12 +124,18 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
return PreAcceptNack.INSTANCE;
if (command.executeAt().is(REJECTED))
- return new PreAcceptOk(txnId, command.executeAt(),
Deps.NONE);
+ return new PreAcceptOk(txnId, command.executeAt(),
Deps.NONE, ExecuteFlags.none());
case Retired:
- Deps deps = calculateDeps(safeStore, txnId, participants,
EpochSupplier.constant(minEpoch), txnId, true);
- if (deps == null)
- return PreAcceptNack.INSTANCE;
+ ExecuteFlags flags;
+ Deps deps;
+ try (DepsCalculator calculator = new DepsCalculator())
+ {
+ deps = calculator.calculate(safeStore, txnId,
participants, minEpoch, txnId, true);
+ if (deps == null)
+ return PreAcceptNack.INSTANCE;
+ flags = calculator.executeFlags(txnId);
+ }
// NOTE: we CANNOT test whether we adopt a future dependency
here because it might be that this command
// is guaranteed to not reach agreement, but that this replica
is unaware of that fact and has pruned
@@ -140,7 +143,7 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
// We do however prohibit later epochs as dependencies as we
cannot handle those effectively
// when back-filling for execution of the transaction.
Invariants.require(deps.maxTxnId(txnId).epoch() <=
txnId.epoch());
- return new PreAcceptOk(txnId, command.executeAtOrTxnId(),
deps);
+ return new PreAcceptOk(txnId, command.executeAtOrTxnId(),
deps, flags);
case Truncated:
case RejectedBallot:
@@ -181,10 +184,11 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
Timestamp witnessedAt =
Timestamp.mergeMaxAndFlags(okMax.witnessedAt, okMin.witnessedAt);
Deps deps = ok1.deps.with(ok2.deps);
+ ExecuteFlags flags = ok1.flags.and(ok2.flags);
- if (deps == okMax.deps && witnessedAt == okMax.witnessedAt)
+ if (deps == okMax.deps && witnessedAt == okMax.witnessedAt &&
okMax.flags == flags)
return okMax;
- return new PreAcceptOk(ok1.txnId, witnessedAt, deps);
+ return new PreAcceptOk(ok1.txnId, witnessedAt, deps, flags);
}
}
@@ -193,12 +197,14 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
public final TxnId txnId;
public final Timestamp witnessedAt;
public final Deps deps;
+ public final ExecuteFlags flags;
- public PreAcceptOk(TxnId txnId, Timestamp witnessedAt, Deps deps)
+ public PreAcceptOk(TxnId txnId, Timestamp witnessedAt, Deps deps,
ExecuteFlags flags)
{
this.txnId = txnId;
this.witnessedAt = witnessedAt;
this.deps = deps;
+ this.flags = flags;
}
@Override
@@ -252,41 +258,6 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
}
}
- public static Deps calculateDeps(SafeCommandStore safeStore, TxnId txnId,
StoreParticipants participants, EpochSupplier minEpoch, Timestamp executeAt,
boolean nullIfRedundant)
- {
- return calculateDeps(safeStore, txnId, participants.touches(),
minEpoch, executeAt, nullIfRedundant);
- }
-
- public static Deps calculateDeps(SafeCommandStore safeStore, TxnId txnId,
Participants<?> touches, EpochSupplier minEpoch, Timestamp executeAt, boolean
nullIfRedundant)
- {
- // NOTE: ExclusiveSyncPoint *relies* on STARTED_BEFORE to ensure it
reports a dependency on *every* earlier TxnId that may execute (before or after
it).
- try (Deps.AbstractBuilder<Deps> builder = new Deps.Builder(true);
- RangeDeps.BuilderByRange redundantBuilder =
RangeDeps.builderByRange())
- {
- RangeDeps redundant =
safeStore.redundantBefore().collectDeps(touches, redundantBuilder, minEpoch,
executeAt).build();
- if (nullIfRedundant && !txnId.is(EphemeralRead))
- {
- TxnId maxRedundantBefore = redundant.maxTxnId(null);
- if (maxRedundantBefore != null &&
maxRedundantBefore.compareTo(executeAt) >= 0)
- {
-
Invariants.require(maxRedundantBefore.is(ExclusiveSyncPoint));
- return null;
- }
- }
-
- safeStore.visit(touches, executeAt, txnId.witnesses(),
- (p1, in, keyOrRange, testTxnId) -> {
- if (p1 == null || !testTxnId.equals(p1))
- in.add(keyOrRange, testTxnId);
- }, executeAt.equals(txnId) ? null : txnId,
builder);
-
- Deps result = builder.build();
- result = new Deps(result.keyDeps,
result.rangeDeps.with(redundant), result.directKeyDeps);
- Invariants.require(!result.contains(txnId));
- return result;
- }
- }
-
/**
* To simplify the implementation of bootstrap/range movements, we have
coordinators abort transactions
* that span too many topology changes for any given shard. This means
that we can always daisy-chain a replica
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
index a8c2c27e..0e9e62f8 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -298,7 +298,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
case PreAccepted:
// only preaccept if we coordinate the transaction
if (safeStore.ranges().coordinates(txnId).intersects(route) &&
Route.isFullRoute(route))
- Commands.preaccept(safeStore, safeCommand, participants,
txnId, partialTxn, null, false, Route.castToFullRoute(route));
+ Commands.preaccept(safeStore, safeCommand, participants,
txnId, partialTxn, null, false);
case NotDefined:
if (invalidIf == IfUncommitted)
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java
b/accord-core/src/main/java/accord/messages/ReadData.java
index 8cba1a25..9a46f7a6 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -69,6 +69,7 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
{
private static final Logger logger =
LoggerFactory.getLogger(ReadData.class);
+ public enum Flag { FAST_READ }
private enum State { PENDING, PENDING_OBSOLETE, RETURNED, OBSOLETE }
protected enum StoreAction { WAIT, EXECUTE, OBSOLETE }
@@ -121,6 +122,7 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
public final TxnId txnId;
public final Participants<?> scope;
public final long executeAtEpoch;
+ public final int flags;
private transient State state = State.PENDING; // TODO (low priority,
semantics): respond with the Executed result we have stored?
transient Timestamp executeAt;
@@ -138,18 +140,30 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
protected transient ReplyContext replyContext;
public ReadData(Node.Id to, Topologies topologies, TxnId txnId,
Participants<?> scope, long executeAtEpoch)
+ {
+ this(to, topologies, txnId, scope, executeAtEpoch, 0);
+ }
+
+ public ReadData(Node.Id to, Topologies topologies, TxnId txnId,
Participants<?> scope, long executeAtEpoch, int flags)
{
this.txnId = txnId;
+ this.flags = flags;
int startIndex = latestRelevantEpochIndex(to, topologies, scope);
this.scope = TxnRequest.computeScope(to, topologies, scope,
startIndex, Participants::slice, Participants::with);
this.executeAtEpoch = executeAtEpoch;
}
protected ReadData(TxnId txnId, Participants<?> scope, long executeAtEpoch)
+ {
+ this(txnId, scope, executeAtEpoch, 0);
+ }
+
+ protected ReadData(TxnId txnId, Participants<?> scope, long
executeAtEpoch, int flags)
{
this.txnId = txnId;
this.scope = scope;
this.executeAtEpoch = executeAtEpoch;
+ this.flags = flags;
}
protected abstract ExecuteOn executeOn();
diff --git a/accord-core/src/main/java/accord/primitives/Range.java
b/accord-core/src/main/java/accord/primitives/Range.java
index e6a86879..4d5c913d 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -28,7 +28,6 @@ import java.util.Objects;
import javax.annotation.Nullable;
-import static accord.utils.Invariants.illegalState;
import static accord.utils.SortedArrays.Search.CEIL;
import static accord.utils.SortedArrays.Search.FAST;
@@ -208,10 +207,10 @@ public abstract class Range implements
Comparable<RoutableKey>, Unseekable, Seek
private Range(RoutingKey start, RoutingKey end)
{
- if (start.compareTo(end) >= 0)
- throw new IllegalArgumentException(start + " >= " + end);
- if (startInclusive() == endInclusive())
- throw illegalState("Range must have one side inclusive, and the
other exclusive. Range of different types should not be mixed.");
+ // TODO (expected): should we at least relax to permit an empty Range?
+ Invariants.requireArgument(start.compareTo(end) < 0, start + " >= " +
end);
+ Invariants.requireArgument(Objects.equals(start.prefix(),
end.prefix()), "Range bounds must share their prefix: %s vs %s", start, end);
+ Invariants.require(startInclusive() != endInclusive(), "Range must
have one side inclusive, and the other exclusive. Range of different types
should not be mixed.");
this.start = start;
this.end = end;
}
@@ -226,6 +225,11 @@ public abstract class Range implements
Comparable<RoutableKey>, Unseekable, Seek
return end;
}
+ public Object prefix()
+ {
+ return start.prefix();
+ }
+
@Override
public final Domain domain() { return Domain.Range; }
diff --git a/accord-core/src/main/java/accord/primitives/Routable.java
b/accord-core/src/main/java/accord/primitives/Routable.java
index 34534f92..45e40445 100644
--- a/accord-core/src/main/java/accord/primitives/Routable.java
+++ b/accord-core/src/main/java/accord/primitives/Routable.java
@@ -89,6 +89,7 @@ public interface Routable
Kind kind();
Domain domain();
Unseekable toUnseekable();
+ default Object prefix() { return null; }
/**
* Deterministically select a key that intersects this Routable and the
provided Ranges
diff --git a/accord-core/src/main/java/accord/primitives/RoutableKey.java
b/accord-core/src/main/java/accord/primitives/RoutableKey.java
index ac7b5840..8814c8df 100644
--- a/accord-core/src/main/java/accord/primitives/RoutableKey.java
+++ b/accord-core/src/main/java/accord/primitives/RoutableKey.java
@@ -37,16 +37,10 @@ public interface RoutableKey extends Routable,
Comparable<RoutableKey>
default int compareAsRoutingKey(@Nonnull RoutableKey that) { return
toUnseekable().compareTo(that.toUnseekable()); }
- /**
- * Some prefix that may be shared by other keys, particularly in a Range
or Ranges.
- * Mostly used for pretty printing Range and Ranges.
- */
- default Object prefix() { return null; }
-
/**
* Some suffix that, combined with prefix(), uniquely identifies the Key.
*/
- default String suffix() { return toString(); }
+ default Object suffix() { return toString(); }
@Override default RoutingKey someIntersectingRoutingKey(@Nullable Ranges
ranges)
{
diff --git a/accord-core/src/main/java/accord/utils/Functions.java
b/accord-core/src/main/java/accord/utils/Functions.java
index 34e491e9..58551c41 100644
--- a/accord-core/src/main/java/accord/utils/Functions.java
+++ b/accord-core/src/main/java/accord/utils/Functions.java
@@ -81,6 +81,14 @@ public class Functions
return result;
}
+ public static <I> long foldl(List<I> list, FoldToLong<I> foldl, long
accumulate)
+ {
+ long result = accumulate;
+ for (int i = 0, mi = list.size(); i < mi; ++i)
+ result = foldl.apply(list.get(i), result);
+ return result;
+ }
+
public static <I, O> O foldl(I[] array, BiFunction<I, O, O> foldl, O zero)
{
O result = zero;
diff --git a/accord-core/src/main/java/accord/utils/TinyEnumSet.java
b/accord-core/src/main/java/accord/utils/TinyEnumSet.java
index 0008cef5..3bf7c604 100644
--- a/accord-core/src/main/java/accord/utils/TinyEnumSet.java
+++ b/accord-core/src/main/java/accord/utils/TinyEnumSet.java
@@ -29,6 +29,16 @@ public class TinyEnumSet<E extends Enum<E>>
this.bitset = encode(values);
}
+ public TinyEnumSet(Enum<E> value)
+ {
+ this.bitset = encode(value);
+ }
+
+ public TinyEnumSet()
+ {
+ this.bitset = 0;
+ }
+
protected TinyEnumSet(int bitset)
{
this.bitset = bitset;
diff --git
a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
index fe8d62d9..8876f8c6 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
@@ -20,6 +20,7 @@ package accord.coordinate;
import accord.Utils;
import accord.api.MessageSink;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.coordinate.tracking.AllTracker;
import accord.impl.IntKey;
import accord.impl.TestAgent;
@@ -120,7 +121,7 @@ class CoordinateSyncPointTest
if (request instanceof PreAccept)
{
PreAccept preAccept = (PreAccept) request;
- onSuccess(args, new PreAccept.PreAcceptOk(preAccept.txnId,
preAccept.txnId, PartialDeps.NONE));
+ onSuccess(args, new PreAccept.PreAcceptOk(preAccept.txnId,
preAccept.txnId, PartialDeps.NONE, ExecuteFlags.none()));
}
else if (request instanceof Accept)
{
diff --git
a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
index 6146e131..60e6e795 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
@@ -236,7 +236,7 @@ public class CoordinateTransactionTest
for (Node n : cluster)
assertEquals(AcceptOutcome.Success,
getUninterruptibly(n.unsafeForKey(key).build(blockingTxnContext, safeStore -> {
StoreParticipants participants =
StoreParticipants.update(safeStore, route, blockingTxnId.epoch(),
blockingTxnId, blockingTxnId.epoch());
- return Commands.preaccept(safeStore,
safeStore.get(blockingTxnId, participants), participants, blockingTxnId,
blockingTxn.slice(safeStore.ranges().allAt(blockingTxnId), true), null, false,
route);
+ return Commands.preaccept(safeStore,
safeStore.get(blockingTxnId, participants), participants, blockingTxnId,
blockingTxn.slice(safeStore.ranges().allAt(blockingTxnId), true), null, false);
})));
// Now create the transaction that should be blocked by the
previous one
@@ -245,7 +245,7 @@ public class CoordinateTransactionTest
for (Node n : cluster)
assertEquals(AcceptOutcome.Success,
getUninterruptibly(n.unsafeForKey(key).build(context, safeStore -> {
StoreParticipants participants =
StoreParticipants.update(safeStore, route, txnId.epoch(), txnId, txnId.epoch());
- return Commands.preaccept(safeStore, safeStore.get(txnId,
participants), participants, txnId,
txn.slice(safeStore.ranges().allAt(txnId.epoch()), true), null, false, route);
+ return Commands.preaccept(safeStore, safeStore.get(txnId,
participants), participants, txnId,
txn.slice(safeStore.ranges().allAt(txnId.epoch()), true), null, false);
})));
diff --git a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
b/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
index 57e7877b..5663f455 100644
--- a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
@@ -145,6 +145,12 @@ public abstract class PrefixedIntHashKey implements
RoutableKey
}
};
}
+
+ @Override
+ public Object prefix()
+ {
+ return prefix;
+ }
}
public static final class Sentinel extends PrefixedIntRoutingKey
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 7ac22f89..855c2f83 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -577,7 +577,7 @@ public class InMemoryJournal implements Journal
executeAt = Invariants.nonNull((Timestamp)
diff.changes.get(EXECUTE_AT));
break;
case EXECUTES_AT_LEAST:
- executeAtLeast = Invariants.nonNull((Timestamp)
diff.changes.get(EXECUTES_AT_LEAST));
+ executesAtLeast = Invariants.nonNull((Timestamp)
diff.changes.get(EXECUTES_AT_LEAST));
break;
case MIN_UNIQUE_HLC:
minUniqueHlc = (Long)diff.changes.get(MIN_UNIQUE_HLC);
diff --git a/accord-core/src/test/java/accord/local/CheckedCommands.java
b/accord-core/src/test/java/accord/local/CheckedCommands.java
index 0ba4a855..63e43095 100644
--- a/accord-core/src/test/java/accord/local/CheckedCommands.java
+++ b/accord-core/src/test/java/accord/local/CheckedCommands.java
@@ -48,7 +48,7 @@ public class CheckedCommands
StoreParticipants participants = StoreParticipants.update(safeStore,
route, txnId.epoch(), txnId, txnId.epoch());
SafeCommand safeCommand = safeStore.get(txnId, participants);
Command before = safeCommand.current();
- Commands.AcceptOutcome result = Commands.preaccept(safeStore,
safeCommand, participants, txnId, partialTxn, null, false, route);
+ Commands.AcceptOutcome result = Commands.preaccept(safeStore,
safeCommand, participants, txnId, partialTxn, null, false);
Command after = safeCommand.current();
if (result != Commands.AcceptOutcome.Success) throw
illegalState("Command mutation rejected: " + result);
consumer.accept(before, after);
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 80e3ee52..cee79a9b 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -139,7 +139,7 @@ public class ImmutableCommandTest
{
StoreParticipants participants =
StoreParticipants.update(safeStore, ROUTE, txnId.epoch(), txnId, txnId.epoch());
SafeCommand safeCommand = safeStore.get(txnId, participants);
- Commands.preaccept(safeStore, safeCommand, participants,
txnId, txn.slice(FULL_RANGES, true), null, false, ROUTE);
+ Commands.preaccept(safeStore, safeCommand, participants,
txnId, txn.slice(FULL_RANGES, true), null, false);
Command command = safeStore.get(txnId).current();
Assertions.assertEquals(Status.PreAccepted, command.status());
Assertions.assertEquals(txnId, command.executeAt());
@@ -174,7 +174,7 @@ public class ImmutableCommandTest
Timestamp expectedTimestamp = Timestamp.fromValues(2, 110, ID1);
getUninterruptibly(commands.build(context, (Consumer<? super
SafeCommandStore>) safeStore -> {
StoreParticipants participants =
StoreParticipants.update(safeStore, ROUTE, txnId.epoch(), txnId, 2);
- Commands.preaccept(safeStore, safeStore.get(txnId, participants),
participants, txnId, txn.slice(FULL_RANGES, true), null, false, ROUTE);
+ Commands.preaccept(safeStore, safeStore.get(txnId, participants),
participants, txnId, txn.slice(FULL_RANGES, true), null, false);
}));
commands.build(PreLoadContext.contextFor(txnId,
txn.keys().toParticipants()), safeStore -> {
Command command = safeStore.get(txnId).current();
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 326aef37..8e30a76b 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import accord.api.RoutingKey;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.impl.IntKey;
import accord.impl.IntKey.Raw;
import accord.impl.TestAgent.RethrowAgent;
@@ -114,7 +115,7 @@ public class PreAcceptTest
messageSink.assertHistorySizes(0, 1);
Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
Deps expectedDeps = new Deps(KeyDeps.NONE, RangeDeps.NONE,
KeyDeps.NONE);
- Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, txnId,
expectedDeps),
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, txnId,
expectedDeps, ExecuteFlags.none()),
messageSink.responses.get(0).payload);
}
finally
@@ -210,7 +211,7 @@ public class PreAcceptTest
Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
Deps expectedDeps = new Deps(KeyDeps.NONE, RangeDeps.NONE,
KeyDeps.NONE);
Timestamp expectedTs = Timestamp.fromValues(1, 110, ID1);
- Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2,
expectedTs, expectedDeps),
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2,
expectedTs, expectedDeps, ExecuteFlags.none()),
messageSink.responses.get(0).payload);
}
finally
@@ -241,7 +242,7 @@ public class PreAcceptTest
messageSink.assertHistorySizes(0, 1);
Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
Deps expectedDeps = new Deps(KeyDeps.NONE, RangeDeps.NONE,
KeyDeps.NONE);
- Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, txnId,
expectedDeps),
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, txnId,
expectedDeps, ExecuteFlags.none()),
messageSink.responses.get(0).payload);
}
finally
@@ -283,7 +284,7 @@ public class PreAcceptTest
messageSink.assertHistorySizes(0, 1);
Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
Deps expectedDeps = new Deps(KeyDeps.NONE, RangeDeps.NONE,
KeyDeps.NONE);
- Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId,
Timestamp.fromValues(2, 110, ID1), expectedDeps),
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId,
Timestamp.fromValues(2, 110, ID1), expectedDeps, ExecuteFlags.none()),
messageSink.responses.get(0).payload);
}
finally
diff --git a/accord-core/src/test/java/accord/utils/Gen.java
b/accord-core/src/test/java/accord/utils/Gen.java
index 19eb223b..357c197a 100644
--- a/accord-core/src/test/java/accord/utils/Gen.java
+++ b/accord-core/src/test/java/accord/utils/Gen.java
@@ -34,7 +34,8 @@ import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
-public interface Gen<A> {
+public interface Gen<A>
+{
/**
* For cases where method handles isn't able to detect the proper type,
this method acts as a cast
* to inform the compiler of the desired type.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]