This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new bf414d3 Refactor Timestamp/TxnId (#22)
bf414d3 is described below
commit bf414d39ea495f8914f7cb5fb5c4138a9c16760b
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Jan 11 18:05:22 2023 +0000
Refactor Timestamp/TxnId (#22)
- Combine real and logical into a single 64-bit HLC
- Introduce 16 flag bits
- Pack epoch (48-bits), HLC (64-bits) and flags (16-bits) into two longs in
memory
---
.../src/main/java/accord/coordinate/CheckOn.java | 8 +-
.../main/java/accord/coordinate/CheckShards.java | 4 +-
.../main/java/accord/coordinate/Coordinate.java | 10 +-
.../src/main/java/accord/coordinate/Execute.java | 14 +-
.../src/main/java/accord/coordinate/FetchData.java | 8 +-
.../main/java/accord/coordinate/FindHomeKey.java | 2 +-
.../src/main/java/accord/coordinate/FindRoute.java | 2 +-
.../java/accord/coordinate/InformHomeOfTxn.java | 4 +-
.../main/java/accord/coordinate/Invalidate.java | 4 +-
.../main/java/accord/coordinate/MaybeRecover.java | 2 +-
.../src/main/java/accord/coordinate/Persist.java | 16 +-
.../src/main/java/accord/coordinate/Propose.java | 2 +-
.../src/main/java/accord/coordinate/Recover.java | 16 +-
.../java/accord/coordinate/RecoverWithHomeKey.java | 2 +-
.../java/accord/coordinate/RecoverWithRoute.java | 12 +-
.../src/main/java/accord/impl/InMemoryCommand.java | 14 --
.../java/accord/impl/InMemoryCommandStore.java | 15 +-
.../java/accord/impl/InMemoryCommandsForKey.java | 5 +-
.../main/java/accord/impl/SimpleProgressLog.java | 12 +-
.../src/main/java/accord/local/Command.java | 23 +--
accord-core/src/main/java/accord/local/Node.java | 39 +++--
.../src/main/java/accord/messages/Accept.java | 8 +-
.../src/main/java/accord/messages/Apply.java | 6 +-
.../java/accord/messages/BeginInvalidation.java | 5 +-
.../main/java/accord/messages/BeginRecovery.java | 6 +-
.../src/main/java/accord/messages/Commit.java | 20 +--
.../src/main/java/accord/messages/GetDeps.java | 4 +-
.../main/java/accord/messages/InformDurable.java | 4 +-
.../java/accord/messages/InformHomeDurable.java | 2 +-
.../main/java/accord/messages/InformOfTxnId.java | 7 +-
.../src/main/java/accord/messages/PreAccept.java | 2 +-
.../src/main/java/accord/messages/ReadData.java | 2 +-
.../src/main/java/accord/messages/TxnRequest.java | 12 +-
.../main/java/accord/messages/WaitOnCommit.java | 4 +-
.../main/java/accord/primitives/AbstractKeys.java | 4 +-
.../java/accord/primitives/AbstractRanges.java | 4 +-
.../src/main/java/accord/primitives/Ballot.java | 29 +++-
.../src/main/java/accord/primitives/Range.java | 2 +-
.../src/main/java/accord/primitives/Routable.java | 24 ++-
.../main/java/accord/primitives/RoutableKey.java | 2 +-
.../src/main/java/accord/primitives/Routables.java | 2 +-
.../src/main/java/accord/primitives/Timestamp.java | 182 ++++++++++++++++++---
.../src/main/java/accord/primitives/Txn.java | 24 ++-
.../src/main/java/accord/primitives/TxnId.java | 98 ++++++++++-
.../src/main/java/accord/primitives/Writes.java | 2 +-
.../main/java/accord/topology/TopologyManager.java | 4 +-
.../src/test/java/accord/burn/TopologyUpdates.java | 4 +-
.../java/accord/coordinate/CoordinateTest.java | 10 +-
.../java/accord/coordinate/TopologyChangeTest.java | 21 +--
.../src/test/java/accord/impl/TestAgent.java | 2 +-
.../src/test/java/accord/impl/list/ListAgent.java | 2 +-
.../test/java/accord/impl/list/ListRequest.java | 3 +-
.../test/java/accord/impl/mock/MockCluster.java | 6 +-
.../src/test/java/accord/local/CommandTest.java | 6 +-
.../src/test/java/accord/local/NodeTest.java | 31 ++--
.../test/java/accord/messages/PreAcceptTest.java | 18 +-
accord-core/src/test/java/accord/txn/DepsTest.java | 43 +++--
.../src/main/java/accord/maelstrom/Json.java | 20 +--
.../main/java/accord/maelstrom/MaelstromAgent.java | 2 +-
59 files changed, 550 insertions(+), 291 deletions(-)
diff --git a/accord-core/src/main/java/accord/coordinate/CheckOn.java
b/accord-core/src/main/java/accord/coordinate/CheckOn.java
index d37a827..a7ec61f 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckOn.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckOn.java
@@ -136,7 +136,7 @@ public class CheckOn extends CheckShards
public OnDone()
{
- Ranges localRanges =
node.topology().localRangesForEpochs(txnId.epoch, untilLocalEpoch);
+ Ranges localRanges =
node.topology().localRangesForEpochs(txnId.epoch(), untilLocalEpoch);
PartialRoute<?> selfRoute = route().slice(localRanges);
full = (CheckStatusOkFull) merged;
sufficientFor = full.sufficientFor(selfRoute);
@@ -165,7 +165,7 @@ public class CheckOn extends CheckShards
txnIds = Iterables.concat(txnIds, partialDeps.txnIds());
PreLoadContext loadContext = contextFor(txnIds, keys);
- node.mapReduceConsumeLocal(loadContext, route, txnId.epoch,
untilLocalEpoch, this);
+ node.mapReduceConsumeLocal(loadContext, route, txnId.epoch(),
untilLocalEpoch, this);
}
@Override
@@ -187,7 +187,7 @@ public class CheckOn extends CheckShards
case Applied:
case PreApplied:
- if (untilLocalEpoch >= full.executeAt.epoch)
+ if (untilLocalEpoch >= full.executeAt.epoch())
{
confirm(command.commit(safeStore, maxRoute,
progressKey, partialTxn, full.executeAt, partialDeps));
confirm(command.apply(safeStore, untilLocalEpoch,
maxRoute, full.executeAt, partialDeps, full.writes, full.result));
@@ -216,7 +216,7 @@ public class CheckOn extends CheckShards
if (!merged.durability.isDurable() || homeKey == null)
return null;
- if (!safeStore.ranges().at(txnId.epoch).contains(homeKey))
+ if (!safeStore.ranges().at(txnId.epoch()).contains(homeKey))
return null;
Timestamp executeAt =
merged.saveStatus.known.executeAt.isDecisionKnown() ? merged.executeAt : null;
diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java
b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index 9ad05fd..5500d90 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -36,13 +36,13 @@ public abstract class CheckShards extends
ReadCoordinator<CheckStatusReply>
private static Topologies topologyFor(Node node, TxnId txnId,
Unseekables<?, ?> contact, long epoch)
{
- return node.topology().preciseEpochs(contact, txnId.epoch, epoch);
+ return node.topology().preciseEpochs(contact, txnId.epoch(), epoch);
}
@Override
protected void contact(Id id)
{
- node.send(id, new CheckStatus(txnId,
contact.slice(topologies().computeRangesForNode(id)), txnId.epoch,
untilRemoteEpoch, includeInfo), this);
+ node.send(id, new CheckStatus(txnId,
contact.slice(topologies().computeRangesForNode(id)), txnId.epoch(),
untilRemoteEpoch, includeInfo), this);
}
protected boolean isSufficient(Id from, CheckStatusOk ok) { return
isSufficient(ok); }
diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java
b/accord-core/src/main/java/accord/coordinate/Coordinate.java
index ae65014..dec539c 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordinate.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java
@@ -155,7 +155,7 @@ public class Coordinate extends AsyncFuture<Result>
implements Callback<PreAccep
// but by sending accept we rule
out hybrid fast-path
// TODO (low priority, efficiency): if we receive an expired
response, perhaps defer to permit at least one other
// node to respond before
invalidating
- if (node.agent().isExpired(txnId, executeAt.real))
+ if (node.agent().isExpired(txnId, executeAt.hlc()))
{
proposeInvalidate(node, Ballot.ZERO, txnId, route.homeKey(),
(success, fail) -> {
if (fail != null)
@@ -164,7 +164,7 @@ public class Coordinate extends AsyncFuture<Result>
implements Callback<PreAccep
}
else
{
- node.withEpoch(executeAt.epoch, () -> {
+ node.withEpoch(executeAt.epoch(), () -> {
commitInvalidate(node, txnId, route, executeAt);
// TODO (required, API consistency): this should
be Invalidated rather than Timeout?
accept(null, new Timeout(txnId, route.homeKey()));
@@ -174,10 +174,10 @@ public class Coordinate extends AsyncFuture<Result>
implements Callback<PreAccep
}
else
{
- node.withEpoch(executeAt.epoch, () -> {
+ node.withEpoch(executeAt.epoch(), () -> {
Topologies topologies = tracker.topologies();
- if (executeAt.epoch > txnId.epoch)
- topologies = node.topology().withUnsyncedEpochs(route,
txnId.epoch, executeAt.epoch);
+ if (executeAt.epoch() > txnId.epoch())
+ topologies = node.topology().withUnsyncedEpochs(route,
txnId.epoch(), executeAt.epoch());
Propose.propose(node, topologies, Ballot.ZERO, txnId, txn,
route, executeAt, deps, this);
});
}
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java
b/accord-core/src/main/java/accord/coordinate/Execute.java
index 03ee66c..11464e3 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -50,13 +50,13 @@ class Execute extends ReadCoordinator<ReadReply>
private Execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route,
Seekables<?, ?> readScope, Timestamp executeAt, Deps deps, BiConsumer<Result,
Throwable> callback)
{
- super(node, node.topology().forEpoch(readScope.toUnseekables(),
executeAt.epoch), txnId);
+ super(node, node.topology().forEpoch(readScope.toUnseekables(),
executeAt.epoch()), txnId);
this.txn = txn;
this.route = route;
this.readScope = readScope;
this.executeAt = executeAt;
this.deps = deps;
- this.applyTo = node.topology().forEpoch(route, executeAt.epoch);
+ this.applyTo = node.topology().forEpoch(route, executeAt.epoch());
this.callback = callback;
}
@@ -64,8 +64,8 @@ class Execute extends ReadCoordinator<ReadReply>
{
if (txn.read().keys().isEmpty())
{
- Topologies sendTo = node.topology().preciseEpochs(route,
txnId.epoch, executeAt.epoch);
- Topologies applyTo = node.topology().forEpoch(route,
executeAt.epoch);
+ Topologies sendTo = node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch());
+ Topologies applyTo = node.topology().forEpoch(route,
executeAt.epoch());
Result result = txn.result(txnId, null);
Persist.persist(node, sendTo, applyTo, txnId, route, txn,
executeAt, deps, txn.execute(executeAt, null), result);
callback.accept(result, null);
@@ -112,8 +112,8 @@ class Execute extends ReadCoordinator<ReadReply>
return Action.Abort;
case NotCommitted:
// the replica may be missing the original commit, or the
additional commit, so send everything
- Topologies topology = node.topology().preciseEpochs(route,
txnId.epoch, executeAt.epoch);
- Topology coordinateTopology = topology.forEpoch(txnId.epoch);
+ Topologies topology = node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch());
+ Topology coordinateTopology = topology.forEpoch(txnId.epoch());
node.send(from, new Commit(Maximal, from, coordinateTopology,
topology, txnId, txn, route, readScope, executeAt, deps, false));
// also try sending a read command to another replica, in case
they're ready to serve a response
return Action.TryAlternative;
@@ -132,7 +132,7 @@ class Execute extends ReadCoordinator<ReadReply>
Result result = txn.result(txnId, data);
callback.accept(result, null);
// avoid re-calculating topologies if it is unchanged
- Topologies sendTo = txnId.epoch == executeAt.epoch ? applyTo :
node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
+ Topologies sendTo = txnId.epoch() == executeAt.epoch() ? applyTo :
node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
Persist.persist(node, sendTo, applyTo, txnId, route, txn,
executeAt, deps, txn.execute(executeAt, data), result);
}
else
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 1bdba70..688e8eb 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -36,7 +36,7 @@ public class FetchData
public static Object fetch(Known fetch, Node node, TxnId txnId, Route<?>
route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known,
Throwable> callback)
{
Invariants.checkArgument(node.topology().hasEpoch(untilLocalEpoch));
- Ranges ranges = node.topology().localRangesForEpochs(txnId.epoch,
untilLocalEpoch);
+ Ranges ranges = node.topology().localRangesForEpochs(txnId.epoch(),
untilLocalEpoch);
if (!route.covers(ranges))
{
return fetchWithHomeKey(fetch, node, txnId, route.homeKey(),
untilLocalEpoch, callback);
@@ -69,13 +69,13 @@ public class FetchData
public static Object fetch(Known fetch, Node node, TxnId txnId,
FullRoute<?> route, @Nullable Timestamp executeAt, long untilLocalEpoch,
BiConsumer<Known, Throwable> callback)
{
- Ranges ranges = node.topology().localRangesForEpochs(txnId.epoch,
untilLocalEpoch);
+ Ranges ranges = node.topology().localRangesForEpochs(txnId.epoch(),
untilLocalEpoch);
return fetchInternal(ranges, fetch, node, txnId,
route.sliceStrict(ranges), executeAt, untilLocalEpoch, callback);
}
private static Object fetchInternal(Ranges ranges, Known target, Node
node, TxnId txnId, PartialRoute<?> route, @Nullable Timestamp executeAt, long
untilLocalEpoch, BiConsumer<Known, Throwable> callback)
{
- long srcEpoch = executeAt == null || target.epoch() == Coordination ?
txnId.epoch : executeAt.epoch;
+ long srcEpoch = executeAt == null || target.epoch() == Coordination ?
txnId.epoch() : executeAt.epoch();
if (!node.topology().hasEpoch(srcEpoch))
return node.topology().awaitEpoch(srcEpoch).map(ignore ->
fetchInternal(ranges, target, node, txnId, route, executeAt, untilLocalEpoch,
callback));
@@ -90,7 +90,7 @@ public class FetchData
Known sufficientFor = ok.sufficientFor(fetch);
// if we discover the executeAt as part of this action, use
that to decide if we requested enough info
Timestamp exec = executeAt != null ? executeAt :
ok.saveStatus.known.executeAt.isDecisionKnown() ? ok.executeAt : null;
- if (sufficientFor.outcome == OutcomeKnown && (exec == null ||
untilLocalEpoch < exec.epoch))
+ if (sufficientFor.outcome == OutcomeKnown && (exec == null ||
untilLocalEpoch < exec.epoch()))
sufficientFor = sufficientFor.with(OutcomeUnknown);
callback.accept(sufficientFor, null);
}
diff --git a/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
b/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
index 1f938c3..4d93a18 100644
--- a/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
+++ b/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
@@ -17,7 +17,7 @@ public class FindHomeKey extends CheckShards
final BiConsumer<RoutingKey, Throwable> callback;
FindHomeKey(Node node, TxnId txnId, Unseekables<?, ?> unseekables,
BiConsumer<RoutingKey, Throwable> callback)
{
- super(node, txnId, unseekables, txnId.epoch, IncludeInfo.No);
+ super(node, txnId, unseekables, txnId.epoch(), IncludeInfo.No);
this.callback = callback;
}
diff --git a/accord-core/src/main/java/accord/coordinate/FindRoute.java
b/accord-core/src/main/java/accord/coordinate/FindRoute.java
index 82b116e..8ad9e37 100644
--- a/accord-core/src/main/java/accord/coordinate/FindRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/FindRoute.java
@@ -37,7 +37,7 @@ public class FindRoute extends CheckShards
final BiConsumer<Result, Throwable> callback;
FindRoute(Node node, TxnId txnId, RoutingKey homeKey, BiConsumer<Result,
Throwable> callback)
{
- super(node, txnId, RoutingKeys.of(homeKey), txnId.epoch,
IncludeInfo.Route);
+ super(node, txnId, RoutingKeys.of(homeKey), txnId.epoch(),
IncludeInfo.Route);
this.callback = callback;
}
diff --git a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
index 79fac8b..4e4275b 100644
--- a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
@@ -49,8 +49,8 @@ public class InformHomeOfTxn extends AsyncFuture<Void>
implements Callback<Simpl
public static Future<Void> inform(Node node, TxnId txnId, RoutingKey
homeKey)
{
- return node.withEpoch(txnId.epoch, () -> {
- Shard homeShard = node.topology().forEpoch(homeKey, txnId.epoch);
+ return node.withEpoch(txnId.epoch(), () -> {
+ Shard homeShard = node.topology().forEpoch(homeKey, txnId.epoch());
InformHomeOfTxn inform = new InformHomeOfTxn(txnId, homeKey,
homeShard);
node.send(homeShard.nodes, new InformOfTxnId(txnId, homeKey),
inform);
return inform;
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java
b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index 1ed1b75..e7c8cc8 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -68,7 +68,7 @@ public class Invalidate implements Callback<InvalidateReply>
this.txnId = txnId;
this.transitivelyInvokedByPriorInvalidation =
transitivelyInvokedByPriorInvalidation;
this.invalidateWith = invalidateWith;
- Topologies topologies = node.topology().forEpoch(invalidateWith,
txnId.epoch);
+ Topologies topologies = node.topology().forEpoch(invalidateWith,
txnId.epoch());
this.tracker = new InvalidationTracker(topologies);
}
@@ -195,7 +195,7 @@ public class Invalidate implements Callback<InvalidateReply>
Preconditions.checkState(maxReply.status.hasBeen(Accepted) ||
tracker.all(InvalidationShardTracker::isPromised));
// if we included the home shard, and we have either a
recoverable status OR have not rejected the fast path,
// we must have at least one response that should
contain the Route
- if (invalidateWith.contains(homeKey) &&
tracker.isPromisedForKey(homeKey, txnId.epoch))
+ if (invalidateWith.contains(homeKey) &&
tracker.isPromisedForKey(homeKey, txnId.epoch()))
throw new IllegalStateException("Received replies
from a node that must have known the route, but that did not include it");
// if < Accepted, we should have short-circuited to
invalidation above. This guarantees no Invaldate/Recover loop, as any later
status will forbid invoking Invalidate
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 1522ec4..634d588 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -47,7 +47,7 @@ public class MaybeRecover extends CheckShards
MaybeRecover(Node node, TxnId txnId, RoutingKey homeKey, @Nullable
Route<?> route, ProgressToken prevProgress, BiConsumer<Outcome, Throwable>
callback)
{
// we only want to enquire with the home shard, but we prefer maximal
route information for running Invalidation against, if necessary
- super(node, txnId, RoutingKeys.of(homeKey), txnId.epoch,
IncludeInfo.Route);
+ super(node, txnId, RoutingKeys.of(homeKey), txnId.epoch(),
IncludeInfo.Route);
this.homeKey = homeKey;
this.route = route;
this.prevProgress = prevProgress;
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java
b/accord-core/src/main/java/accord/coordinate/Persist.java
index a7daec9..63b84ee 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -56,15 +56,15 @@ public class Persist implements Callback<ApplyReply>
public static void persist(Node node, Topologies sendTo, Topologies
applyTo, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps
deps, Writes writes, Result result)
{
Persist persist = new Persist(node, applyTo, txnId, route, txn,
executeAt, deps);
- node.send(sendTo.nodes(), to -> new Apply(to, sendTo, applyTo,
executeAt.epoch, txnId, route, executeAt, deps, writes, result), persist);
+ node.send(sendTo.nodes(), to -> new Apply(to, sendTo, applyTo,
executeAt.epoch(), txnId, route, executeAt, deps, writes, result), persist);
}
public static void persistAndCommit(Node node, TxnId txnId, FullRoute<?>
route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
{
- Topologies sendTo = node.topology().preciseEpochs(route, txnId.epoch,
executeAt.epoch);
- Topologies applyTo = node.topology().forEpoch(route, executeAt.epoch);
+ Topologies sendTo = node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch());
+ Topologies applyTo = node.topology().forEpoch(route,
executeAt.epoch());
Persist persist = new Persist(node, sendTo, txnId, route, txn,
executeAt, deps);
- node.send(sendTo.nodes(), to -> new Apply(to, sendTo, applyTo,
executeAt.epoch, txnId, route, executeAt, deps, writes, result), persist);
+ node.send(sendTo.nodes(), to -> new Apply(to, sendTo, applyTo,
executeAt.epoch(), txnId, route, executeAt, deps, writes, result), persist);
}
private Persist(Node node, Topologies topologies, TxnId txnId,
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps)
@@ -93,21 +93,21 @@ public class Persist implements Callback<ApplyReply>
if (!isDone)
{
// TODO (low priority, consider, efficiency): send to
non-home replicas also, so they may clear their log more easily?
- Shard homeShard =
node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch);
+ Shard homeShard =
node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch());
node.send(homeShard, new InformHomeDurable(txnId,
route.homeKey(), executeAt, Durable, persistedOn));
isDone = true;
}
else if (!tracker.hasInFlight() && !tracker.hasFailures())
{
- Shard homeShard =
node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch);
+ Shard homeShard =
node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch());
node.send(homeShard, new InformHomeDurable(txnId,
route.homeKey(), executeAt, Universal, persistedOn));
}
}
break;
case Insufficient:
- Topologies topologies = node.topology().preciseEpochs(route,
txnId.epoch, executeAt.epoch);
+ Topologies topologies = node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch());
// TODO (easy, cleanup): use static method in Commit
- node.send(from, new Commit(Kind.Maximal, from,
topologies.forEpoch(txnId.epoch), topologies, txnId, txn, route, null,
executeAt, deps, false));
+ node.send(from, new Commit(Kind.Maximal, from,
topologies.forEpoch(txnId.epoch()), topologies, txnId, txn, route, null,
executeAt, deps, false));
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java
b/accord-core/src/main/java/accord/coordinate/Propose.java
index 6209f9d..3b75512 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -152,7 +152,7 @@ class Propose implements Callback<AcceptReply>
public static Invalidate proposeInvalidate(Node node, Ballot ballot,
TxnId txnId, RoutingKey invalidateWithKey, BiConsumer<Void, Throwable> callback)
{
- Shard shard = node.topology().forEpochIfKnown(invalidateWithKey,
txnId.epoch);
+ Shard shard = node.topology().forEpochIfKnown(invalidateWithKey,
txnId.epoch());
Invalidate invalidate = new Invalidate(node, shard, ballot, txnId,
invalidateWithKey, callback);
node.send(shard.nodes, to -> new Accept.Invalidate(ballot, txnId,
invalidateWithKey), invalidate);
return invalidate;
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java
b/accord-core/src/main/java/accord/coordinate/Recover.java
index 9f8e80c..3ad5d67 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -63,7 +63,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
AwaitCommit(Node node, TxnId txnId, Unseekables<?, ?> unseekables)
{
- Topology topology =
node.topology().globalForEpoch(txnId.epoch).forSelection(unseekables);
+ Topology topology =
node.topology().globalForEpoch(txnId.epoch()).forSelection(unseekables);
this.tracker = new QuorumTracker(new
Topologies.Single(node.topology().sorter(), topology));
node.send(topology.nodes(), to -> new WaitOnCommit(to, topology,
txnId, unseekables), this);
}
@@ -132,7 +132,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
this.txn = txn;
this.route = route;
this.callback = callback;
- assert topologies.oldestEpoch() == topologies.currentEpoch() &&
topologies.currentEpoch() == txnId.epoch;
+ assert topologies.oldestEpoch() == topologies.currentEpoch() &&
topologies.currentEpoch() == txnId.epoch();
this.tracker = new RecoveryTracker(topologies);
}
@@ -147,7 +147,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
public static Recover recover(Node node, TxnId txnId, Txn txn,
FullRoute<?> route, BiConsumer<Outcome, Throwable> callback)
{
- return recover(node, txnId, txn, route, callback,
node.topology().forEpoch(route, txnId.epoch));
+ return recover(node, txnId, txn, route, callback,
node.topology().forEpoch(route, txnId.epoch()));
}
public static Recover recover(Node node, TxnId txnId, Txn txn,
FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, Topologies
topologies)
@@ -158,7 +158,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback)
{
- return recover(node, ballot, txnId, txn, route, callback,
node.topology().forEpoch(route, txnId.epoch));
+ return recover(node, ballot, txnId, txn, route, callback,
node.topology().forEpoch(route, txnId.epoch()));
}
public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, Topologies
topologies)
@@ -212,7 +212,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case Applied:
case PreApplied:
// TODO (desired, efficiency): in some cases we can use
the deps we already have (e.g. if we have a quorum of Committed responses)
- node.withEpoch(executeAt.epoch, () -> {
+ node.withEpoch(executeAt.epoch(), () -> {
CollectDeps.withDeps(node, txnId, route, txn,
acceptOrCommit.executeAt, (deps, fail) -> {
if (fail != null)
{
@@ -232,7 +232,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case PreCommitted:
case Committed:
// TODO (desired, efficiency): in some cases we can use
the deps we already have (e.g. if we have a quorum of Committed responses)
- node.withEpoch(executeAt.epoch, () -> {
+ node.withEpoch(executeAt.epoch(), () -> {
CollectDeps.withDeps(node, txnId, route, txn,
executeAt, (deps, fail) -> {
if (fail != null) accept(null, fail);
else Execute.execute(node, txnId, txn, route,
acceptOrCommit.executeAt, deps, this);
@@ -294,14 +294,14 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
private void commitInvalidate()
{
Timestamp invalidateUntil = recoverOks.stream().map(ok ->
ok.executeAt).reduce(txnId, Timestamp::max);
- node.withEpoch(invalidateUntil.epoch, () ->
Commit.Invalidate.commitInvalidate(node, txnId, route, invalidateUntil));
+ node.withEpoch(invalidateUntil.epoch(), () ->
Commit.Invalidate.commitInvalidate(node, txnId, route, invalidateUntil));
isDone = true;
callback.accept(ProgressToken.INVALIDATED, null);
}
private void propose(Timestamp executeAt, Deps deps)
{
- node.withEpoch(executeAt.epoch, () -> Propose.propose(node, ballot,
txnId, txn, route, executeAt, deps, this));
+ node.withEpoch(executeAt.epoch(), () -> Propose.propose(node, ballot,
txnId, txn, route, executeAt, deps, this));
}
private Deps mergeDeps()
diff --git
a/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
b/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
index 1ea6856..dabd86e 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
@@ -26,7 +26,7 @@ public class RecoverWithHomeKey extends CheckShards
implements BiConsumer<Object
RecoverWithHomeKey(Node node, TxnId txnId, RoutingKey homeKey, Status
witnessedByInvalidation, BiConsumer<Outcome, Throwable> callback)
{
- super(node, txnId, RoutingKeys.of(homeKey), txnId.epoch,
IncludeInfo.Route);
+ super(node, txnId, RoutingKeys.of(homeKey), txnId.epoch(),
IncludeInfo.Route);
this.witnessedByInvalidation = witnessedByInvalidation;
// if witnessedByInvalidation == AcceptedInvalidate then we cannot
assume its definition was known, and our comparison with the status is invalid
Preconditions.checkState(witnessedByInvalidation !=
Status.AcceptedInvalidate);
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 775f1d7..9dfb0b1 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -31,7 +31,7 @@ public class RecoverWithRoute extends CheckShards
private RecoverWithRoute(Node node, Topologies topologies, @Nullable
Ballot promisedBallot, TxnId txnId, FullRoute<?> route, Status
witnessedByInvalidation, BiConsumer<Outcome, Throwable> callback)
{
- super(node, txnId, route, txnId.epoch, IncludeInfo.All);
+ super(node, txnId, route, txnId.epoch(), IncludeInfo.All);
// if witnessedByInvalidation == AcceptedInvalidate then we cannot
assume its definition was known, and our comparison with the status is invalid
Invariants.checkState(witnessedByInvalidation !=
Status.AcceptedInvalidate);
// if witnessedByInvalidation == Invalidated we should anyway not be
recovering
@@ -40,12 +40,12 @@ public class RecoverWithRoute extends CheckShards
this.route = route;
this.callback = callback;
this.witnessedByInvalidation = witnessedByInvalidation;
- assert topologies.oldestEpoch() == topologies.currentEpoch() &&
topologies.currentEpoch() == txnId.epoch;
+ assert topologies.oldestEpoch() == topologies.currentEpoch() &&
topologies.currentEpoch() == txnId.epoch();
}
public static RecoverWithRoute recover(Node node, TxnId txnId,
FullRoute<?> route, @Nullable Status witnessedByInvalidation,
BiConsumer<Outcome, Throwable> callback)
{
- return recover(node, node.topology().forEpoch(route, txnId.epoch),
txnId, route, witnessedByInvalidation, callback);
+ return recover(node, node.topology().forEpoch(route, txnId.epoch()),
txnId, route, witnessedByInvalidation, callback);
}
public static RecoverWithRoute recover(Node node, Topologies topologies,
TxnId txnId, FullRoute<?> route, @Nullable Status witnessedByInvalidation,
BiConsumer<Outcome, Throwable> callback)
@@ -55,7 +55,7 @@ public class RecoverWithRoute extends CheckShards
public static RecoverWithRoute recover(Node node, @Nullable Ballot
promisedBallot, TxnId txnId, FullRoute<?> route, @Nullable Status
witnessedByInvalidation, BiConsumer<Outcome, Throwable> callback)
{
- return recover(node, node.topology().forEpoch(route, txnId.epoch),
promisedBallot, txnId, route, witnessedByInvalidation, callback);
+ return recover(node, node.topology().forEpoch(route, txnId.epoch()),
promisedBallot, txnId, route, witnessedByInvalidation, callback);
}
public static RecoverWithRoute recover(Node node, Topologies topologies,
Ballot ballot, TxnId txnId, FullRoute<?> route, Status witnessedByInvalidation,
BiConsumer<Outcome, Throwable> callback)
@@ -79,7 +79,7 @@ public class RecoverWithRoute extends CheckShards
@Override
protected boolean isSufficient(Id from, CheckStatusOk ok)
{
- Ranges rangesForNode =
topologies().forEpoch(txnId.epoch).rangesForNode(from);
+ Ranges rangesForNode =
topologies().forEpoch(txnId.epoch()).rangesForNode(from);
PartialRoute<?> route = this.route.slice(rangesForNode);
return isSufficient(route, ok);
}
@@ -141,7 +141,7 @@ public class RecoverWithRoute extends CheckShards
if (known.deps.isDecisionKnown())
{
Deps deps = merged.committedDeps.reconstitute(route());
- node.withEpoch(merged.executeAt.epoch, () -> {
+ node.withEpoch(merged.executeAt.epoch(), () -> {
Persist.persistAndCommit(node, txnId, route(), txn,
merged.executeAt, deps, merged.writes, merged.result);
});
callback.accept(APPLIED, null);
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommand.java
b/accord-core/src/main/java/accord/impl/InMemoryCommand.java
index 79a5ec7..d72b574 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommand.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommand.java
@@ -24,7 +24,6 @@ import accord.local.*;
import accord.local.Status.Durability;
import accord.local.Status.Known;
import accord.primitives.*;
-import accord.primitives.Txn.Kind;
import javax.annotation.Nullable;
import java.util.*;
@@ -40,7 +39,6 @@ public class InMemoryCommand extends Command
private Route<?> route;
private RoutingKey homeKey, progressKey;
private PartialTxn partialTxn;
- private Kind kind;
private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
private Timestamp executeAt;
private @Nullable PartialDeps partialDeps = null;
@@ -104,18 +102,6 @@ public class InMemoryCommand extends Command
return homeKey;
}
- @Override
- public Kind kind()
- {
- return kind;
- }
-
- @Override
- public void setKind(Kind kind)
- {
- this.kind = kind;
- }
-
@Override
protected void setHomeKey(RoutingKey key)
{
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index f112d0c..7977d2c 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -29,7 +29,6 @@ import
accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState;
import accord.local.Command;
import accord.local.CommandsForKey;
import accord.local.CommandListener;
-import accord.local.Node;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
@@ -161,9 +160,9 @@ public class InMemoryCommandStore
@Override
public Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys)
{
- Timestamp max = maxConflict(keys, ranges().at(txnId.epoch));
+ Timestamp max = maxConflict(keys, ranges().at(txnId.epoch()));
long epoch = latestEpoch();
- if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch &&
!agent.isExpired(txnId, time.now()))
+ if (txnId.compareTo(max) > 0 && txnId.epoch() >= epoch &&
!agent.isExpired(txnId, time.now()))
return txnId;
return time.uniqueNow(max);
@@ -187,8 +186,8 @@ public class InMemoryCommandStore
public void forEpochCommands(Ranges ranges, long epoch,
Consumer<Command> consumer)
{
- Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
- Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ Timestamp minTimestamp = Timestamp.minForEpoch(epoch);
+ Timestamp maxTimestamp = Timestamp.maxForEpoch(epoch);
for (Range range : ranges)
{
Iterable<InMemoryCommandsForKey> rangeCommands =
commandsForKey.subMap(
@@ -205,8 +204,8 @@ public class InMemoryCommandStore
public void forCommittedInEpoch(Ranges ranges, long epoch,
Consumer<Command> consumer)
{
- Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
- Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ Timestamp minTimestamp = Timestamp.minForEpoch(epoch);
+ Timestamp maxTimestamp = Timestamp.maxForEpoch(epoch);
for (Range range : ranges)
{
Iterable<InMemoryCommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
@@ -262,7 +261,7 @@ public class InMemoryCommandStore
public void forEach(Routable keyOrRange, Ranges slice,
Consumer<CommandsForKey> forEach)
{
- switch (keyOrRange.kind())
+ switch (keyOrRange.domain())
{
default: throw new AssertionError();
case Key:
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
index 11bffff..8287f45 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
@@ -37,7 +37,6 @@ import static
accord.local.CommandsForKey.CommandTimeseries.TestDep.*;
import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
import static accord.local.Status.KnownDeps.DepsUnknown;
import static accord.local.Status.PreAccepted;
-import static accord.primitives.Txn.Kind.WRITE;
public class InMemoryCommandsForKey extends CommandsForKey
{
@@ -78,7 +77,7 @@ public class InMemoryCommandsForKey extends CommandsForKey
public Stream<T> before(@Nonnull Timestamp timestamp, @Nonnull
TestKind testKind, @Nonnull TestDep testDep, @Nullable TxnId depId, @Nonnull
TestStatus testStatus, @Nullable Status status)
{
return commands.headMap(timestamp, false).values().stream()
- .filter(cmd -> testKind == RorWs || cmd.kind() == WRITE)
+ .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
// If we don't have any dependencies, we treat a
dependency filter as a mismatch
.filter(cmd -> testDep == ANY_DEPS || (cmd.known().deps !=
DepsUnknown && (cmd.partialDeps().contains(depId) ^ (testDep == WITHOUT))))
.filter(cmd -> TestStatus.test(cmd.status(), testStatus,
status))
@@ -89,7 +88,7 @@ public class InMemoryCommandsForKey extends CommandsForKey
public Stream<T> after(@Nonnull Timestamp timestamp, @Nonnull TestKind
testKind, @Nonnull TestDep testDep, @Nullable TxnId depId, @Nonnull TestStatus
testStatus, @Nullable Status status)
{
return commands.tailMap(timestamp, false).values().stream()
- .filter(cmd -> testKind == RorWs || cmd.kind() == WRITE)
+ .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
// If we don't have any dependencies, we treat a
dependency filter as a mismatch
.filter(cmd -> testDep == ANY_DEPS || (cmd.known().deps !=
DepsUnknown && (cmd.partialDeps().contains(depId) ^ (testDep == WITHOUT))))
.filter(cmd -> TestStatus.test(cmd.status(), testStatus,
status))
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index 060af15..7fff500 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -223,7 +223,7 @@ public class SimpleProgressLog implements
ProgressLog.Factory
// must also be committed, as at the time of
writing we do not guarantee dissemination of Commit
// records to the home shard, so we only know
the executeAt shards will have witnessed this
// if the home shard is at an earlier phase,
it must run recovery
- long epoch = command.executeAt().epoch;
+ long epoch = command.executeAt().epoch();
node.withEpoch(epoch, () -> debugInvestigating
= FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch,
(success, fail) -> {
commandStore.execute(PreLoadContext.empty(), ignore -> {
// should have found enough
information to apply the result, but in case we did not reset progress
@@ -235,7 +235,7 @@ public class SimpleProgressLog implements
ProgressLog.Factory
else
{
RoutingKey homeKey = command.homeKey();
- node.withEpoch(txnId.epoch, () -> {
+ node.withEpoch(txnId.epoch(), () -> {
Future<? extends Outcome> recover =
node.maybeRecover(txnId, homeKey, command.route(), token);
recover.addCallback((success, fail) -> {
@@ -349,10 +349,10 @@ public class SimpleProgressLog implements
ProgressLog.Factory
if (!isFullRoute(command.route()))
return false;
- if (!node.topology().hasEpoch(command.executeAt().epoch))
+ if (!node.topology().hasEpoch(command.executeAt().epoch()))
return false;
- Topologies topology =
node.topology().preciseEpochs(command.route(), command.txnId().epoch,
command.executeAt().epoch);
+ Topologies topology =
node.topology().preciseEpochs(command.route(), command.txnId().epoch(),
command.executeAt().epoch());
notAwareOfDurability = topology.copyOfNodes();
notPersisted = topology.copyOfNodes();
if (whenReady != null)
@@ -432,7 +432,7 @@ public class SimpleProgressLog implements
ProgressLog.Factory
FullRoute<?> route =
Route.castToFullRoute(command.route());
Timestamp executeAt = command.executeAt();
investigating = new CoordinateAwareness();
- Topologies topologies =
node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
+ Topologies topologies =
node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
node.send(notAwareOfDurability, to -> new
InformDurable(to, topologies, route, txnId, executeAt, Durable), investigating);
}
@@ -481,7 +481,7 @@ public class SimpleProgressLog implements
ProgressLog.Factory
setProgress(Investigating);
// first make sure we have enough information to obtain
the command locally
Timestamp executeAt = command.hasBeen(PreCommitted) ?
command.executeAt() : null;
- long srcEpoch = (executeAt != null ? executeAt :
txnId).epoch;
+ long srcEpoch = (executeAt != null ? executeAt :
txnId).epoch();
// TODO (desired, consider): compute fromEpoch, the epoch
we already have this txn replicated until
long toEpoch = Math.max(srcEpoch, node.topology().epoch());
Unseekables<?, ?> someKeys = unseekables(command);
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index 3915411..4f7697f 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -72,9 +72,6 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
public abstract TxnId txnId();
- public abstract Kind kind();
- public abstract void setKind(Kind kind);
-
// TODO (desirable, API consistency): should any of these calls be
replaced by corresponding known() registers?
public boolean hasBeen(Status status)
{
@@ -295,12 +292,12 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
return AcceptOutcome.Redundant;
}
- if (known().isDefinitionKnown() && !kind().equals(kind))
+ if (known().isDefinitionKnown() && txnId().rw() != kind)
throw new IllegalArgumentException("Transaction kind is different
to the definition we have already received");
TxnId txnId = txnId();
Ranges coordinateRanges = coordinateRanges(safeStore);
- Ranges acceptRanges = txnId.epoch == executeAt.epoch ?
coordinateRanges : safeStore.ranges().between(txnId.epoch, executeAt.epoch);
+ Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ?
coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
if (!validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore,
null, Ignore, partialDeps, Set))
@@ -312,7 +309,6 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
setExecuteAt(executeAt);
setPromised(ballot);
setAccepted(ballot);
- setKind(kind);
set(safeStore, coordinateRanges, Ranges.EMPTY, shard, route, null,
Ignore, partialDeps, Set);
switch (status())
{
@@ -410,7 +406,7 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
protected void populateWaitingOn(SafeCommandStore safeStore)
{
- Ranges ranges = safeStore.ranges().since(executeAt().epoch);
+ Ranges ranges = safeStore.ranges().since(executeAt().epoch());
if (ranges != null) {
partialDeps().forEachOn(ranges, txnId -> {
Command command = safeStore.ifLoaded(txnId);
@@ -492,7 +488,7 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
Ranges executeRanges = executeRanges(safeStore, executeAt);
if (untilEpoch < safeStore.latestEpoch())
{
- Ranges expectedRanges =
safeStore.ranges().between(executeAt.epoch, untilEpoch);
+ Ranges expectedRanges =
safeStore.ranges().between(executeAt.epoch(), untilEpoch);
Invariants.checkState(expectedRanges.containsAll(executeRanges));
}
ProgressShard shard = progressShard(safeStore, route,
coordinateRanges);
@@ -830,7 +826,7 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
setHomeKey(homeKey);
// TODO (low priority, safety): if we're processed on a node that
does not know the latest epoch,
// do we guarantee the home key calculation is unchanged
since the prior epoch?
- if (progressKey() == null && owns(safeStore, txnId().epoch,
homeKey))
+ if (progressKey() == null && owns(safeStore, txnId().epoch(),
homeKey))
progressKey(homeKey);
}
else if (!this.homeKey().equals(homeKey))
@@ -892,7 +888,7 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
if (progressKey == NO_PROGRESS_KEY)
return No;
- Ranges coordinateRanges = safeStore.ranges().at(txnId().epoch);
+ Ranges coordinateRanges = safeStore.ranges().at(txnId().epoch());
if (!coordinateRanges.contains(progressKey))
return No;
@@ -901,12 +897,12 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
private Ranges coordinateRanges(SafeCommandStore safeStore)
{
- return safeStore.ranges().at(txnId().epoch);
+ return safeStore.ranges().at(txnId().epoch());
}
private Ranges executeRanges(SafeCommandStore safeStore, Timestamp
executeAt)
{
- return safeStore.ranges().since(executeAt.epoch);
+ return safeStore.ranges().since(executeAt.epoch());
}
enum EnsureAction { Ignore, Check, Add, TrySet, Set }
@@ -977,7 +973,7 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
if (!validate(ensurePartialTxn, existingRanges, additionalRanges,
covers(partialTxn()), covers(partialTxn), "txn", partialTxn))
return false;
- if (partialTxn != null && kind() != null &&
!kind().equals(partialTxn.kind()))
+ if (partialTxn != null && txnId().rw() != partialTxn.kind())
throw new IllegalArgumentException("Transaction has different kind
to the definition we previously received");
if (shard.isHome() && ensurePartialTxn != Ignore)
@@ -1019,7 +1015,6 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
case Set:
case TrySet:
- setKind(partialTxn.kind());
setPartialTxn(partialTxn = partialTxn.slice(allRanges,
shard.isHome()));
// TODO (expected, efficiency): we may register the same
ranges more than once
safeStore.forEach(partialTxn.keys(), allRanges, forKey -> {
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 6b1ed4e..9047034 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
import accord.coordinate.*;
import accord.messages.*;
import accord.primitives.*;
+import accord.primitives.Routable.Domain;
import accord.utils.MapReduceConsume;
import com.google.common.annotations.VisibleForTesting;
@@ -56,6 +57,8 @@ import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;
+import static accord.primitives.Routable.Domain.Key;
+
public class Node implements ConfigurationService.Listener, NodeTimeService
{
public static class Id implements Comparable<Id>
@@ -131,7 +134,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
this.topology = new TopologyManager(topologySorter, id);
this.nowSupplier = nowSupplier;
Topology topology = configService.currentTopology();
- this.now = new AtomicReference<>(new Timestamp(topology.epoch(),
nowSupplier.getAsLong(), 0, id));
+ this.now = new
AtomicReference<>(Timestamp.fromValues(topology.epoch(),
nowSupplier.getAsLong(), id));
this.agent = agent;
this.random = random;
this.scheduler = scheduler;
@@ -225,10 +228,10 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
return now.updateAndGet(cur -> {
// TODO (low priority, proof): this diverges from proof; either
show isomorphism or make consistent
long now = nowSupplier.getAsLong();
- long epoch = Math.max(cur.epoch, topology.epoch());
- return (now > cur.real)
- ? new Timestamp(epoch, now, 0, id)
- : new Timestamp(epoch, cur.real, cur.logical + 1, id);
+ long epoch = Math.max(cur.epoch(), topology.epoch());
+ return now > cur.hlc()
+ ? Timestamp.fromValues(epoch, now, id)
+ : Timestamp.fromValues(epoch, cur.hlc() + 1, id);
});
}
@@ -237,8 +240,8 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
if (now.get().compareTo(atLeast) < 0)
now.accumulateAndGet(atLeast, (current, proposed) -> {
long minEpoch = topology.epoch();
- current = current.withMinEpoch(minEpoch);
- proposed = proposed.withMinEpoch(minEpoch);
+ current = current.withEpochAtLeast(minEpoch);
+ proposed = proposed.withEpochAtLeast(minEpoch);
return proposed.compareTo(current) <= 0 ?
current.logicalNext(id) : proposed;
});
return uniqueNow();
@@ -256,7 +259,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
public Future<Void> forEachLocalSince(PreLoadContext context,
Unseekables<?, ?> unseekables, Timestamp since, Consumer<SafeCommandStore>
forEach)
{
- return commandStores.forEach(context, unseekables, since.epoch,
Long.MAX_VALUE, forEach);
+ return commandStores.forEach(context, unseekables, since.epoch(),
Long.MAX_VALUE, forEach);
}
public Future<Void> ifLocal(PreLoadContext context, RoutingKey key, long
epoch, Consumer<SafeCommandStore> ifLocal)
@@ -266,7 +269,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
public Future<Void> ifLocalSince(PreLoadContext context, RoutingKey key,
Timestamp since, Consumer<SafeCommandStore> ifLocal)
{
- return commandStores.ifLocal(context, key, since.epoch,
Long.MAX_VALUE, ifLocal);
+ return commandStores.ifLocal(context, key, since.epoch(),
Long.MAX_VALUE, ifLocal);
}
public <T> void mapReduceConsumeLocal(TxnRequest<?> request, long
minEpoch, long maxEpoch, MapReduceConsume<SafeCommandStore, T> mapReduceConsume)
@@ -353,14 +356,14 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
messageSink.reply(replyingToNode, replyContext, send);
}
- public TxnId nextTxnId()
+ public TxnId nextTxnId(Txn.Kind rw, Domain domain)
{
- return new TxnId(uniqueNow());
+ return new TxnId(uniqueNow(), rw, domain);
}
public Future<Result> coordinate(Txn txn)
{
- return coordinate(nextTxnId(), txn);
+ return coordinate(nextTxnId(txn.kind(), Key), txn);
}
public Future<Result> coordinate(TxnId txnId, Txn txn)
@@ -368,7 +371,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
// TODO (desirable, consider): The combination of updating the epoch
of the next timestamp with epochs we don't have topologies for,
// and requiring preaccept to talk to its topology epoch means that
learning of a new epoch via timestamp
// (ie not via config service) will halt any new txns from a node
until it receives this topology
- Future<Result> result = withEpoch(txnId.epoch, () ->
initiateCoordination(txnId, txn));
+ Future<Result> result = withEpoch(txnId.epoch(), () ->
initiateCoordination(txnId, txn));
coordinating.putIfAbsent(txnId, result);
result.addCallback((success, fail) -> coordinating.remove(txnId,
result));
return result;
@@ -390,13 +393,13 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
private @Nullable RoutingKey trySelectHomeKey(TxnId txnId, Seekables<?, ?>
keysOrRanges)
{
- int i = (int)keysOrRanges.findNextIntersection(0,
topology().localForEpoch(txnId.epoch).ranges(), 0);
+ int i = (int)keysOrRanges.findNextIntersection(0,
topology().localForEpoch(txnId.epoch()).ranges(), 0);
return i >= 0 ? keysOrRanges.get(i).someIntersectingRoutingKey() :
null;
}
public RoutingKey selectProgressKey(TxnId txnId, Route<?> route,
RoutingKey homeKey)
{
- return selectProgressKey(txnId.epoch, route, homeKey);
+ return selectProgressKey(txnId.epoch(), route, homeKey);
}
public RoutingKey selectProgressKey(long epoch, Route<?> route, RoutingKey
homeKey)
@@ -414,7 +417,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
public RoutingKey trySelectProgressKey(TxnId txnId, Route<?> route,
RoutingKey homeKey)
{
- return trySelectProgressKey(txnId.epoch, route, homeKey);
+ return trySelectProgressKey(txnId.epoch(), route, homeKey);
}
public RoutingKey trySelectProgressKey(long epoch, Route<?> route,
RoutingKey homeKey)
@@ -435,7 +438,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
public RoutingKey selectRandomHomeKey(TxnId txnId)
{
- Ranges ranges = topology().localForEpoch(txnId.epoch).ranges();
+ Ranges ranges = topology().localForEpoch(txnId.epoch()).ranges();
Range range = ranges.get(ranges.size() == 1 ? 0 :
random.nextInt(ranges.size()));
return range.someIntersectingRoutingKey();
}
@@ -458,7 +461,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
return result;
}
- Future<Outcome> result = withEpoch(txnId.epoch, () -> {
+ Future<Outcome> result = withEpoch(txnId.epoch(), () -> {
RecoverFuture<Outcome> future = new RecoverFuture<>();
RecoverWithRoute.recover(this, txnId, route, null, future);
return future;
diff --git a/accord-core/src/main/java/accord/messages/Accept.java
b/accord-core/src/main/java/accord/messages/Accept.java
index 22d2a87..7ea2631 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -92,7 +92,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
return new AcceptReply(command.promised());
case Success:
// TODO (desirable, efficiency): we don't need to calculate
deps if executeAt == txnId
- return new AcceptReply(calculatePartialDeps(safeStore, txnId,
keys, kind, executeAt, safeStore.ranges().between(minEpoch, executeAt.epoch)));
+ return new AcceptReply(calculatePartialDeps(safeStore, txnId,
keys, kind, executeAt, safeStore.ranges().between(minEpoch,
executeAt.epoch())));
}
}
@@ -116,7 +116,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
public void process()
{
- node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch, this);
+ node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(), this);
}
@Override
@@ -222,7 +222,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
public void process()
{
- node.mapReduceConsumeLocal(this, someKey, txnId.epoch, this);
+ node.mapReduceConsumeLocal(this, someKey, txnId.epoch(), this);
}
@Override
@@ -256,7 +256,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
@Override
public long waitForEpoch()
{
- return txnId.epoch;
+ return txnId.epoch();
}
}
}
diff --git a/accord-core/src/main/java/accord/messages/Apply.java
b/accord-core/src/main/java/accord/messages/Apply.java
index 8554af2..c692d8d 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -73,7 +73,7 @@ public class Apply extends TxnRequest<ApplyReply>
public void process()
{
// note, we do not also commit here if txnId.epoch != executeAt.epoch,
as the scope() for a commit would be different
- node.mapReduceConsumeLocal(this, txnId.epoch, untilEpoch, this);
+ node.mapReduceConsumeLocal(this, txnId.epoch(), untilEpoch, this);
}
public ApplyReply apply(SafeCommandStore safeStore)
@@ -101,8 +101,8 @@ public class Apply extends TxnRequest<ApplyReply>
{
if (reply == ApplyReply.Applied)
{
- node.ifLocal(empty(), scope.homeKey(), txnId.epoch, instance -> {
- node.withEpoch(executeAt.epoch, () ->
instance.progressLog().durableLocal(txnId));
+ node.ifLocal(empty(), scope.homeKey(), txnId.epoch(), instance -> {
+ node.withEpoch(executeAt.epoch(), () ->
instance.progressLog().durableLocal(txnId));
}).addCallback(node.agent());
}
node.reply(replyTo, replyContext, reply);
diff --git a/accord-core/src/main/java/accord/messages/BeginInvalidation.java
b/accord-core/src/main/java/accord/messages/BeginInvalidation.java
index d4face3..57f48f7 100644
--- a/accord-core/src/main/java/accord/messages/BeginInvalidation.java
+++ b/accord-core/src/main/java/accord/messages/BeginInvalidation.java
@@ -33,9 +33,10 @@ public class BeginInvalidation extends
AbstractEpochRequest<BeginInvalidation.In
this.ballot = ballot;
}
+ @Override
public void process()
{
- node.mapReduceConsumeLocal(this, someUnseekables, txnId.epoch,
txnId.epoch, this);
+ node.mapReduceConsumeLocal(this, someUnseekables, txnId.epoch(),
txnId.epoch(), this);
}
@Override
@@ -79,7 +80,7 @@ public class BeginInvalidation extends
AbstractEpochRequest<BeginInvalidation.In
@Override
public long waitForEpoch()
{
- return txnId.epoch;
+ return txnId.epoch();
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index db8958c..dad40c5 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -79,7 +79,7 @@ public class BeginRecovery extends
TxnRequest<BeginRecovery.RecoverReply>
@Override
protected void process()
{
- node.mapReduceConsumeLocal(this, txnId.epoch, txnId.epoch, this);
+ node.mapReduceConsumeLocal(this, txnId.epoch(), txnId.epoch(), this);
}
@Override
@@ -105,7 +105,7 @@ public class BeginRecovery extends
TxnRequest<BeginRecovery.RecoverReply>
PartialDeps deps = command.partialDeps();
if (!command.known().deps.isProposalKnown())
{
- deps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(),
partialTxn.kind(), txnId, safeStore.ranges().at(txnId.epoch));
+ deps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(),
partialTxn.kind(), txnId, safeStore.ranges().at(txnId.epoch()));
}
boolean rejectsFastPath;
@@ -118,7 +118,7 @@ public class BeginRecovery extends
TxnRequest<BeginRecovery.RecoverReply>
}
else
{
- Ranges ranges = safeStore.ranges().at(txnId.epoch);
+ Ranges ranges = safeStore.ranges().at(txnId.epoch());
rejectsFastPath = acceptedStartedAfterWithoutWitnessing(safeStore,
txnId, ranges, partialTxn.keys()).anyMatch(ignore -> true);
if (!rejectsFastPath)
rejectsFastPath =
committedExecutesAfterWithoutWitnessing(safeStore, txnId, ranges,
partialTxn.keys()).anyMatch(ignore -> true);
diff --git a/accord-core/src/main/java/accord/messages/Commit.java
b/accord-core/src/main/java/accord/messages/Commit.java
index 1d9f47a..55c1c88 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -73,7 +73,7 @@ public class Commit extends TxnRequest<ReadNack>
if (isHome)
sendRoute = route;
}
- else if (executeAt.epoch != txnId.epoch)
+ else if (executeAt.epoch() != txnId.epoch())
{
Ranges coordinateRanges = coordinateTopology.rangesForNode(to);
Ranges executeRanges = topologies.computeRangesForNode(to);
@@ -104,11 +104,11 @@ public class Commit extends TxnRequest<ReadNack>
public static void commitMinimalAndRead(Node node, Topologies
executeTopologies, TxnId txnId, Txn txn, FullRoute<?> route, Seekables<?, ?>
readScope, Timestamp executeAt, Deps deps, Set<Id> readSet, Callback<ReadReply>
callback)
{
Topologies allTopologies = executeTopologies;
- if (txnId.epoch != executeAt.epoch)
- allTopologies = node.topology().preciseEpochs(route, txnId.epoch,
executeAt.epoch);
+ if (txnId.epoch() != executeAt.epoch())
+ allTopologies = node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch());
- Topology executeTopology = executeTopologies.forEpoch(executeAt.epoch);
- Topology coordinateTopology = allTopologies.forEpoch(txnId.epoch);
+ Topology executeTopology =
executeTopologies.forEpoch(executeAt.epoch());
+ Topology coordinateTopology = allTopologies.forEpoch(txnId.epoch());
for (Node.Id to : executeTopology.nodes())
{
boolean read = readSet.contains(to);
@@ -140,7 +140,7 @@ public class Commit extends TxnRequest<ReadNack>
public void process()
{
- node.mapReduceConsumeLocal(this, txnId.epoch, executeAt.epoch, this);
+ node.mapReduceConsumeLocal(this, txnId.epoch(), executeAt.epoch(),
this);
}
// TODO (expected, efficiency, clarity): do not guard with synchronized;
let mapReduceLocal decide how to enforce mutual exclusivity
@@ -207,15 +207,15 @@ public class Commit extends TxnRequest<ReadNack>
public static void commitInvalidate(Node node, TxnId txnId,
Unseekables<?, ?> inform, Timestamp until)
{
- commitInvalidate(node, txnId, inform, until.epoch);
+ commitInvalidate(node, txnId, inform, until.epoch());
}
public static void commitInvalidate(Node node, TxnId txnId,
Unseekables<?, ?> inform, long untilEpoch)
{
// TODO (expected, safety): this kind of check needs to be
inserted in all equivalent methods
- Invariants.checkState(untilEpoch >= txnId.epoch);
+ Invariants.checkState(untilEpoch >= txnId.epoch());
Invariants.checkState(node.topology().hasEpoch(untilEpoch));
- Topologies commitTo = node.topology().preciseEpochs(inform,
txnId.epoch, untilEpoch);
+ Topologies commitTo = node.topology().preciseEpochs(inform,
txnId.epoch(), untilEpoch);
commitInvalidate(node, commitTo, txnId, inform);
}
@@ -270,7 +270,7 @@ public class Commit extends TxnRequest<ReadNack>
public void process(Node node, Id from, ReplyContext replyContext)
{
- node.forEachLocal(this, scope, txnId.epoch, invalidateUntilEpoch,
+ node.forEachLocal(this, scope, txnId.epoch(), invalidateUntilEpoch,
safeStore ->
safeStore.command(txnId).commitInvalidate(safeStore))
.addCallback(node.agent());
}
diff --git a/accord-core/src/main/java/accord/messages/GetDeps.java
b/accord-core/src/main/java/accord/messages/GetDeps.java
index b2f296d..de4d518 100644
--- a/accord-core/src/main/java/accord/messages/GetDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetDeps.java
@@ -44,13 +44,13 @@ public class GetDeps extends
TxnRequest.WithUnsynced<PartialDeps>
public void process()
{
- node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch, this);
+ node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(), this);
}
@Override
public PartialDeps apply(SafeCommandStore instance)
{
- Ranges ranges = instance.ranges().between(minEpoch, executeAt.epoch);
+ Ranges ranges = instance.ranges().between(minEpoch, executeAt.epoch());
return calculatePartialDeps(instance, txnId, keys, kind, executeAt,
ranges);
}
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java
b/accord-core/src/main/java/accord/messages/InformDurable.java
index da6123a..5586c22 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -55,7 +55,7 @@ public class InformDurable extends TxnRequest<Reply>
implements PreLoadContext
// TODO (required, consider): We might not replicate either
txnId.epoch OR executeAt.epoch, but some inbetween.
// Do we need to receive this message
in that case? If so, we need to account for this when selecting a progress key
at = executeAt;
- progressKey = node.selectProgressKey(executeAt.epoch, scope,
scope.homeKey());
+ progressKey = node.selectProgressKey(executeAt.epoch(), scope,
scope.homeKey());
shard = Adhoc;
}
else
@@ -64,7 +64,7 @@ public class InformDurable extends TxnRequest<Reply>
implements PreLoadContext
}
// TODO (expected, efficiency): do not load from disk to perform this
update
- node.mapReduceConsumeLocal(contextFor(txnId), progressKey, at.epoch,
this);
+ node.mapReduceConsumeLocal(contextFor(txnId), progressKey, at.epoch(),
this);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/InformHomeDurable.java
b/accord-core/src/main/java/accord/messages/InformHomeDurable.java
index a263cc7..1f91153 100644
--- a/accord-core/src/main/java/accord/messages/InformHomeDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformHomeDurable.java
@@ -32,7 +32,7 @@ public class InformHomeDurable implements Request
public void process(Node node, Id replyToNode, ReplyContext replyContext)
{
// TODO (expected, efficiency): do not load txnId first
- node.ifLocal(contextFor(txnId), homeKey, txnId.epoch, safeStore -> {
+ node.ifLocal(contextFor(txnId), homeKey, txnId.epoch(), safeStore -> {
Command command = safeStore.command(txnId);
command.setDurability(safeStore, durability, homeKey, executeAt);
safeStore.progressLog().durable(command, persistedOn);
diff --git a/accord-core/src/main/java/accord/messages/InformOfTxnId.java
b/accord-core/src/main/java/accord/messages/InformOfTxnId.java
index a11b0db..946dbda 100644
--- a/accord-core/src/main/java/accord/messages/InformOfTxnId.java
+++ b/accord-core/src/main/java/accord/messages/InformOfTxnId.java
@@ -2,8 +2,6 @@ package accord.messages;
import accord.api.RoutingKey;
import accord.local.*;
-import accord.primitives.Keys;
-import accord.primitives.Seekables;
import accord.primitives.TxnId;
import java.util.Collections;
@@ -22,10 +20,11 @@ public class InformOfTxnId extends
AbstractEpochRequest<Reply> implements Reques
this.homeKey = homeKey;
}
+ @Override
public void process()
{
// TODO (expected, efficiency): do not first load txnId
- node.mapReduceConsumeLocal(this, homeKey, txnId.epoch, this);
+ node.mapReduceConsumeLocal(this, homeKey, txnId.epoch(), this);
}
@Override
@@ -64,7 +63,7 @@ public class InformOfTxnId extends
AbstractEpochRequest<Reply> implements Reques
@Override
public long waitForEpoch()
{
- return txnId.epoch;
+ return txnId.epoch();
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java
b/accord-core/src/main/java/accord/messages/PreAccept.java
index 576797e..310c0a0 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -99,7 +99,7 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
default:
case Success:
case Redundant:
- return new PreAcceptOk(txnId, command.executeAt(),
calculatePartialDeps(safeStore, txnId, partialTxn.keys(), partialTxn.kind(),
txnId, safeStore.ranges().between(minEpoch, txnId.epoch)));
+ return new PreAcceptOk(txnId, command.executeAt(),
calculatePartialDeps(safeStore, txnId, partialTxn.keys(), partialTxn.kind(),
txnId, safeStore.ranges().between(minEpoch, txnId.epoch())));
case RejectedBallot:
return PreAcceptNack.INSTANCE;
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java
b/accord-core/src/main/java/accord/messages/ReadData.java
index 9471c8d..d204de2 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -65,7 +65,7 @@ public class ReadData extends
AbstractEpochRequest<ReadData.ReadNack> implements
public ReadData(Node.Id to, Topologies topologies, TxnId txnId,
Seekables<?, ?> readScope, Timestamp executeAt)
{
super(txnId);
- this.executeAtEpoch = executeAt.epoch;
+ this.executeAtEpoch = executeAt.epoch();
int startIndex = latestRelevantEpochIndex(to, topologies, readScope);
this.readScope = computeScope(to, topologies, (Seekables)readScope,
startIndex, Seekables::slice, Seekables::union);
this.waitForEpoch = computeWaitForEpoch(to, topologies, startIndex);
diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java
b/accord-core/src/main/java/accord/messages/TxnRequest.java
index b80968d..58bc8d4 100644
--- a/accord-core/src/main/java/accord/messages/TxnRequest.java
+++ b/accord-core/src/main/java/accord/messages/TxnRequest.java
@@ -56,15 +56,15 @@ public abstract class TxnRequest<R> implements Request,
PreLoadContext, MapReduc
this.minEpoch = topologies.oldestEpoch();
this.doNotComputeProgressKey = doNotComputeProgressKey(topologies,
startIndex, txnId, waitForEpoch());
- Ranges ranges = topologies.forEpoch(txnId.epoch).rangesForNode(to);
+ Ranges ranges =
topologies.forEpoch(txnId.epoch()).rangesForNode(to);
if (doNotComputeProgressKey)
{
Invariants.checkState(!route.intersects(ranges)); // confirm
dest is not a replica on txnId.epoch
}
else if (Invariants.isParanoid())
{
- // check that the destination's newer topology does not yield
different ranges
- long progressEpoch = Math.min(waitForEpoch(), txnId.epoch);
+ boolean intersects = route.intersects(ranges);
+ long progressEpoch = Math.min(waitForEpoch(), txnId.epoch());
Ranges computesRangesOn =
topologies.forEpoch(progressEpoch).rangesForNode(to);
if (computesRangesOn == null)
Invariants.checkState(!route.intersects(ranges));
@@ -147,7 +147,7 @@ public abstract class TxnRequest<R> implements Request,
PreLoadContext, MapReduc
RoutingKey progressKey(Node node)
{
// if waitForEpoch < txnId.epoch, then this replica's ownership is
unchanged
- long progressEpoch = min(waitForEpoch(), txnId.epoch);
+ long progressEpoch = min(waitForEpoch(), txnId.epoch());
return node.trySelectProgressKey(progressEpoch, scope,
scope.homeKey());
}
@@ -276,7 +276,7 @@ public abstract class TxnRequest<R> implements Request,
PreLoadContext, MapReduc
// (as it might be done so with stale ring information)
// TODO (low priority, clarity): this would be better defined as
"hasProgressKey"
- return waitForEpoch < txnId.epoch && startIndex > 0
- && topologies.get(startIndex).epoch() < txnId.epoch;
+ return waitForEpoch < txnId.epoch() && startIndex > 0
+ && topologies.get(startIndex).epoch() < txnId.epoch();
}
}
diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java
b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
index 3a77406..490bc65 100644
--- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java
+++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
@@ -70,7 +70,7 @@ public class WaitOnCommit implements Request,
MapReduceConsume<SafeCommandStore,
this.node = node;
this.replyTo = replyToNode;
this.replyContext = replyContext;
- node.mapReduceConsumeLocal(this, scope, txnId.epoch, txnId.epoch,
this);
+ node.mapReduceConsumeLocal(this, scope, txnId.epoch(), txnId.epoch(),
this);
}
@Override
@@ -189,6 +189,6 @@ public class WaitOnCommit implements Request,
MapReduceConsume<SafeCommandStore,
@Override
public long waitForEpoch()
{
- return txnId.epoch;
+ return txnId.epoch();
}
}
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index b777696..9b11cc4 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -13,7 +13,7 @@ import accord.api.RoutingKey;
import accord.utils.*;
import net.nicoulaj.compilecommand.annotations.Inline;
-import static accord.primitives.Routable.Kind.Key;
+import static accord.primitives.Routable.Domain.Key;
@SuppressWarnings("rawtypes")
// TODO (desired, efficiency): check that foldl call-sites are inlined and
optimised by HotSpot
@@ -46,7 +46,7 @@ public abstract class AbstractKeys<K extends RoutableKey, KS
extends Routables<K
}
@Override
- public final Unseekable.Kind kindOfContents()
+ public final Routable.Domain kindOfContents()
{
return Key;
}
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index a87adce..9d5a4fa 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -96,9 +96,9 @@ public abstract class AbstractRanges<RS extends
Routables<Range, ?>> implements
}
@Override
- public final Unseekable.Kind kindOfContents()
+ public final Routable.Domain kindOfContents()
{
- return Unseekable.Kind.Range;
+ return Routable.Domain.Range;
}
@Override
diff --git a/accord-core/src/main/java/accord/primitives/Ballot.java
b/accord-core/src/main/java/accord/primitives/Ballot.java
index 50d5a87..7e66668 100644
--- a/accord-core/src/main/java/accord/primitives/Ballot.java
+++ b/accord-core/src/main/java/accord/primitives/Ballot.java
@@ -22,6 +22,21 @@ import accord.local.Node.Id;
public class Ballot extends Timestamp
{
+ public static Ballot fromBits(long msb, long lsb, Id node)
+ {
+ return new Ballot(msb, lsb, node);
+ }
+
+ public static Ballot fromValues(long epoch, long hlc, Id node)
+ {
+ return fromValues(epoch, hlc, 0, node);
+ }
+
+ public static Ballot fromValues(long epoch, long hlc, int flags, Id node)
+ {
+ return new Ballot(epoch, hlc, flags, node);
+ }
+
public static final Ballot ZERO = new Ballot(Timestamp.NONE);
public Ballot(Timestamp from)
@@ -29,8 +44,18 @@ public class Ballot extends Timestamp
super(from);
}
- public Ballot(long epoch, long real, int logical, Id node)
+ Ballot(long epoch, long hlc, int flags, Id node)
+ {
+ super(epoch, hlc, flags, node);
+ }
+
+ Ballot(long msb, long lsb, Id node)
+ {
+ super(msb, lsb, node);
+ }
+
+ public Ballot merge(Timestamp that)
{
- super(epoch, real, logical, node);
+ return merge(this, that, Ballot::fromBits);
}
}
diff --git a/accord-core/src/main/java/accord/primitives/Range.java
b/accord-core/src/main/java/accord/primitives/Range.java
index 1a52d32..463a910 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -178,7 +178,7 @@ public abstract class Range implements
Comparable<RoutableKey>, Unseekable, Seek
return end;
}
- public final Kind kind() { return Kind.Range; }
+ public final Domain domain() { return Domain.Range; }
public abstract boolean startInclusive();
public abstract boolean endInclusive();
diff --git a/accord-core/src/main/java/accord/primitives/Routable.java
b/accord-core/src/main/java/accord/primitives/Routable.java
index 575f1e8..a94132b 100644
--- a/accord-core/src/main/java/accord/primitives/Routable.java
+++ b/accord-core/src/main/java/accord/primitives/Routable.java
@@ -7,8 +7,28 @@ import accord.api.RoutingKey;
*/
public interface Routable
{
- enum Kind { Key, Range }
- Kind kind();
+ enum Domain
+ {
+ Key, Range;
+ private static final Domain[] VALUES = Domain.values();
+
+ public boolean isKey()
+ {
+ return this == Key;
+ }
+
+ public boolean isRange()
+ {
+ return this == Range;
+ }
+
+ public static Routable.Domain ofOrdinal(int ordinal)
+ {
+ return VALUES[ordinal];
+ }
+ }
+
+ Domain domain();
Unseekable toUnseekable();
RoutingKey someIntersectingRoutingKey();
}
diff --git a/accord-core/src/main/java/accord/primitives/RoutableKey.java
b/accord-core/src/main/java/accord/primitives/RoutableKey.java
index 4e90fe0..f8de9d6 100644
--- a/accord-core/src/main/java/accord/primitives/RoutableKey.java
+++ b/accord-core/src/main/java/accord/primitives/RoutableKey.java
@@ -46,7 +46,7 @@ public interface RoutableKey extends Routable,
Comparable<RoutableKey>
*/
int compareTo(@Nonnull RoutableKey that);
- default Kind kind() { return Kind.Key; }
+ default Domain domain() { return Domain.Key; }
RoutingKey toUnseekable();
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java
b/accord-core/src/main/java/accord/primitives/Routables.java
index 94d4baa..88eb4f5 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -57,7 +57,7 @@ public interface Routables<K extends Routable, U extends
Routables<K, ?>> extend
*/
int findNext(int thisIndex, K find, SortedArrays.Search search);
- Routable.Kind kindOfContents();
+ Routable.Domain kindOfContents();
@Inline
static <Input extends Routable, T> T foldl(Routables<Input, ?> inputs,
AbstractRanges<?> matching, IndexedFold<? super Input, T> fold, T initialValue)
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 80c2660..5ac5e15 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -19,49 +19,124 @@
package accord.primitives;
import accord.local.Node.Id;
+import accord.utils.Invariants;
+
+import javax.annotation.Nonnull;
public class Timestamp implements Comparable<Timestamp>
{
+ public static Timestamp fromBits(long msb, long lsb, Id node)
+ {
+ return new Timestamp(msb, lsb, node);
+ }
+
+ public static Timestamp fromValues(long epoch, long hlc, Id node)
+ {
+ return new Timestamp(epoch, hlc, 0, node);
+ }
+
+ public static Timestamp fromValues(long epoch, long hlc, int flags, Id
node)
+ {
+ return new Timestamp(epoch, hlc, flags, node);
+ }
+
+ public static Timestamp maxForEpoch(long epoch)
+ {
+ return new Timestamp(epochMsb(epoch) | 0xffff, Long.MAX_VALUE, Id.MAX);
+ }
+
+ public static Timestamp minForEpoch(long epoch)
+ {
+ return new Timestamp(epochMsb(epoch), 0, Id.NONE);
+ }
+
+ private static final long MAX_EPOCH = (1L << 48) - 1;
+ private static final long HLC_INCR = 1L << 16;
+ private static final long MAX_FLAGS = HLC_INCR - 1;
public static final Timestamp NONE = new Timestamp(0, 0, 0, Id.NONE);
- public final long epoch;
- public final long real;
- public final int logical;
+ public final long msb;
+ public final long lsb;
public final Id node;
- public Timestamp(long epoch, long real, int logical, Id node)
+ Timestamp(long epoch, long hlc, int flags, Id node)
+ {
+ Invariants.checkArgument(epoch <= MAX_EPOCH);
+ Invariants.checkArgument(flags <= MAX_FLAGS);
+ this.msb = epochMsb(epoch) | hlcMsb(hlc);
+ this.lsb = hlcLsb(hlc) | flags;
+ this.node = node;
+ }
+
+ Timestamp(long msb, long lsb, Id node)
{
- this.epoch = epoch;
- this.real = real;
- this.logical = logical;
+ this.msb = msb;
+ this.lsb = lsb;
this.node = node;
}
public Timestamp(Timestamp copy)
{
- this.epoch = copy.epoch;
- this.real = copy.real;
- this.logical = copy.logical;
+ this.msb = copy.msb;
+ this.lsb = copy.lsb;
+ this.node = copy.node;
+ }
+
+ Timestamp(Timestamp copy, int flags)
+ {
+ Invariants.checkArgument(flags <= MAX_FLAGS);
+ this.msb = copy.msb;
+ this.lsb = notFlags(copy.lsb) | flags;
this.node = copy.node;
}
- public Timestamp withMinEpoch(long minEpoch)
+ public long epoch()
{
- return minEpoch <= epoch ? this : new Timestamp(minEpoch, real,
logical, node);
+ return epoch(msb);
+ }
+
+ /**
+ * A hybrid logical clock with implementation-defined resolution
+ */
+ public long hlc()
+ {
+ return highHlc(msb) | lowHlc(lsb);
+ }
+
+ public int flags()
+ {
+ return flags(lsb);
+ }
+
+ public Timestamp withEpochAtLeast(long minEpoch)
+ {
+ return minEpoch <= epoch() ? this : new Timestamp(minEpoch, hlc(),
flags(), node);
}
public Timestamp logicalNext(Id node)
{
- return new Timestamp(epoch, real, logical + 1, node);
+ long lsb = this.lsb + HLC_INCR;
+ long msb = this.msb;
+ if (lowHlc(lsb) == 0)
+ ++msb; // overflow of lsb
+ return new Timestamp(msb, lsb, node);
}
@Override
- public int compareTo(Timestamp that)
+ public int compareTo(@Nonnull Timestamp that)
+ {
+ if (this == that) return 0;
+ int c = Long.compareUnsigned(this.msb, that.msb);
+ if (c == 0) c = Long.compare(lowHlc(this.lsb), lowHlc(that.lsb));
+ if (c == 0) c = this.node.compareTo(that.node);
+ return c;
+ }
+
+ public int compareToStrict(@Nonnull Timestamp that)
{
if (this == that) return 0;
- int c = Long.compare(this.epoch, that.epoch);
- if (c == 0) c = Long.compare(this.real, that.real);
- if (c == 0) c = Integer.compare(this.logical, that.logical);
+ int c = Long.compareUnsigned(this.msb, that.msb);
+ if (c == 0) c = Long.compareUnsigned(this.lsb, that.lsb);
if (c == 0) c = this.node.compareTo(that.node);
return c;
}
@@ -69,12 +144,20 @@ public class Timestamp implements Comparable<Timestamp>
@Override
public int hashCode()
{
- return (int) (((((epoch * 31) + real) * 31) + node.hashCode()) * 31 +
logical);
+ return (int) (((msb * 31) + lowHlc(lsb)) * 31) + node.hashCode();
}
public boolean equals(Timestamp that)
{
- return this.epoch == that.epoch && this.real == that.real &&
this.logical == that.logical && this.node.equals(that.node);
+ return this.msb == that.msb && lowHlc(this.lsb) == lowHlc(that.lsb) &&
this.node.equals(that.node);
+ }
+
+ /**
+ * Include flag bits in identity
+ */
+ public boolean equalsStrict(Timestamp that)
+ {
+ return this.msb == that.msb && this.lsb == that.lsb &&
this.node.equals(that.node);
}
@Override
@@ -98,10 +181,67 @@ public class Timestamp implements Comparable<Timestamp>
return a.compareTo(b) <= 0 ? a : b;
}
+ private static long epoch(long msb)
+ {
+ return msb >>> 16;
+ }
+
+ private static long epochMsb(long epoch)
+ {
+ return epoch << 16;
+ }
+
+ private static long hlcMsb(long hlc)
+ {
+ return hlc >>> 48;
+ }
+
+ private static long hlcLsb(long hlc)
+ {
+ return hlc << 16;
+ }
+
+ private static long highHlc(long msb)
+ {
+ return msb << 48;
+ }
+
+ private static long lowHlc(long lsb)
+ {
+ return lsb >>> 16;
+ }
+
+ private static int flags(long lsb)
+ {
+ return (int) (lsb & MAX_FLAGS);
+ }
+
+ private static long notFlags(long lsb)
+ {
+ return lsb & ~MAX_FLAGS;
+ }
+
+ public Timestamp merge(Timestamp that)
+ {
+ return merge(this, that, Timestamp::fromBits);
+ }
+
+ interface Constructor<T>
+ {
+ T construct(long msb, long lsb, Id node);
+ }
+
+ static <T extends Timestamp> T merge(Timestamp a, Timestamp b,
Constructor<T> constructor)
+ {
+ Invariants.checkArgument(a.msb == b.msb);
+ Invariants.checkArgument(lowHlc(a.lsb) == lowHlc(b.lsb));
+ Invariants.checkArgument(a.node.equals(b.node));
+ return constructor.construct(a.msb, a.lsb | b.lsb, a.node);
+ }
+
@Override
public String toString()
{
- return "[" + epoch + ',' + real + ',' + logical + ',' + node + ']';
+ return "[" + epoch() + ',' + hlc() + ',' + flags() + ',' + node + ']';
}
-
}
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java
b/accord-core/src/main/java/accord/primitives/Txn.java
index 8703c04..2feead3 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Objects;
import accord.local.Command;
-import accord.local.CommandStore;
import accord.api.*;
import accord.local.SafeCommandStore;
@@ -37,11 +36,24 @@ public interface Txn
{
enum Kind
{
- READ, WRITE;
+ Read, Write;
+ // in future: BlindWrite, Interactive?
+
+ private static final Kind[] VALUES = Kind.values();
public boolean isWrite()
{
- return this == WRITE;
+ return this == Write;
+ }
+
+ public boolean isRead()
+ {
+ return this == Read;
+ }
+
+ public static Kind ofOrdinal(int ordinal)
+ {
+ return VALUES[ordinal];
}
}
@@ -55,7 +67,7 @@ public interface Txn
public InMemory(@Nonnull Seekables<?, ?> keys, @Nonnull Read read,
@Nonnull Query query)
{
- this.kind = Kind.READ;
+ this.kind = Kind.Read;
this.keys = keys;
this.read = read;
this.query = query;
@@ -64,7 +76,7 @@ public interface Txn
public InMemory(@Nonnull Keys keys, @Nonnull Read read, @Nonnull Query
query, @Nullable Update update)
{
- this.kind = Kind.WRITE;
+ this.kind = Kind.Write;
this.keys = keys;
this.read = read;
this.update = update;
@@ -174,7 +186,7 @@ public interface Txn
default Future<Data> read(SafeCommandStore safeStore, Command command)
{
- Ranges ranges = safeStore.ranges().at(command.executeAt().epoch);
+ Ranges ranges = safeStore.ranges().at(command.executeAt().epoch());
List<Future<Data>> futures = read().keys().foldl(ranges, (key,
accumulate, index) -> {
Future<Data> result = read().read(key, kind(), safeStore,
command.executeAt(), safeStore.dataStore());
accumulate.add(result);
diff --git a/accord-core/src/main/java/accord/primitives/TxnId.java
b/accord-core/src/main/java/accord/primitives/TxnId.java
index bff1e5c..e2b40bb 100644
--- a/accord-core/src/main/java/accord/primitives/TxnId.java
+++ b/accord-core/src/main/java/accord/primitives/TxnId.java
@@ -19,16 +19,106 @@
package accord.primitives;
import accord.local.Node.Id;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Txn.Kind;
+
+import static accord.primitives.Txn.Kind.Read;
+import static accord.primitives.Txn.Kind.Write;
public class TxnId extends Timestamp
{
- public TxnId(Timestamp timestamp)
+ public static TxnId fromBits(long msb, long lsb, Id node)
+ {
+ return new TxnId(msb, lsb, node);
+ }
+
+ public static TxnId fromValues(long epoch, long hlc, Id node)
+ {
+ return new TxnId(epoch, hlc, 0, node);
+ }
+
+ public static TxnId fromValues(long epoch, long hlc, int flags, Id node)
+ {
+ return new TxnId(epoch, hlc, flags, node);
+ }
+
+ public TxnId(Timestamp timestamp, Kind rw, Domain domain)
+ {
+ super(timestamp, flags(rw, domain));
+ }
+
+ public TxnId(long epoch, long hlc, Kind rw, Domain domain, Id node)
+ {
+ this(epoch, hlc, flags(rw, domain), node);
+ }
+
+ private TxnId(long epoch, long hlc, int flags, Id node)
+ {
+ super(epoch, hlc, flags, node);
+ }
+
+ private TxnId(long msb, long lsb, Id node)
+ {
+ super(msb, lsb, node);
+ }
+
+ public boolean isWrite()
+ {
+ return rwOrdinal(flags()) == Write.ordinal();
+ }
+
+ public boolean isRead()
+ {
+ return rwOrdinal(flags()) == Read.ordinal();
+ }
+
+ public Kind rw()
+ {
+ return rw(flags());
+ }
+
+ public Domain domain()
+ {
+ return domain(flags());
+ }
+
+ public TxnId merge(Timestamp that)
+ {
+ return merge(this, that, TxnId::fromBits);
+ }
+
+ private static int flags(Kind rw, Domain domain)
+ {
+ return flags(rw) | flags(domain);
+ }
+
+ private static int flags(Kind rw)
+ {
+ return rw.ordinal() << 1;
+ }
+
+ private static int flags(Domain domain)
+ {
+ return domain.ordinal();
+ }
+
+ private static Kind rw(int flags)
+ {
+ return Kind.ofOrdinal(rwOrdinal(flags));
+ }
+
+ private static Domain domain(int flags)
+ {
+ return Domain.ofOrdinal(domainOrdinal(flags));
+ }
+
+ private static int rwOrdinal(int flags)
{
- super(timestamp);
+ return (flags >> 1) & 1;
}
- public TxnId(long epoch, long real, int logical, Id node)
+ private static int domainOrdinal(int flags)
{
- super(epoch, real, logical, node);
+ return flags & 1;
}
}
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java
b/accord-core/src/main/java/accord/primitives/Writes.java
index f8d90b5..c3f7635 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -67,7 +67,7 @@ public class Writes
if (write == null)
return SUCCESS;
- Ranges ranges = safeStore.ranges().since(executeAt.epoch);
+ Ranges ranges = safeStore.ranges().since(executeAt.epoch());
if (ranges == null)
return SUCCESS;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 38d6589..6665ef5 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -294,7 +294,7 @@ public class TopologyManager implements
ConfigurationService.Listener
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, Timestamp
at)
{
- return withUnsyncedEpochs(select, at.epoch);
+ return withUnsyncedEpochs(select, at.epoch());
}
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, long epoch)
@@ -304,7 +304,7 @@ public class TopologyManager implements
ConfigurationService.Listener
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, Timestamp
min, Timestamp max)
{
- return withUnsyncedEpochs(select, min.epoch, max.epoch);
+ return withUnsyncedEpochs(select, min.epoch(), max.epoch());
}
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, long
minEpoch, long maxEpoch)
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index 61aa6b9..cc8645a 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -118,12 +118,12 @@ public class TopologyUpdates
break;
case PreApplied:
case Applied:
- node.withEpoch(Math.max(executeAt.epoch, toEpoch), () -> {
+ node.withEpoch(Math.max(executeAt.epoch(), toEpoch), () ->
{
FetchData.fetch(PreApplied.minKnown, node, txnId,
route, executeAt, toEpoch, callback);
});
break;
case Invalidated:
- node.forEachLocal(contextFor(txnId), route, txnId.epoch,
toEpoch, safeStore -> {
+ node.forEachLocal(contextFor(txnId), route, txnId.epoch(),
toEpoch, safeStore -> {
safeStore.command(txnId).commitInvalidate(safeStore);
});
}
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index 3b94f26..8017b21 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -33,6 +33,8 @@ import static accord.Utils.id;
import static accord.Utils.ids;
import static accord.Utils.writeTxn;
import static accord.impl.IntKey.keys;
+import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Txn.Kind.Write;
public class CoordinateTest
{
@@ -44,7 +46,7 @@ public class CoordinateTest
Node node = cluster.get(1);
Assertions.assertNotNull(node);
- TxnId txnId = node.nextTxnId();
+ TxnId txnId = node.nextTxnId(Write, Key);
Keys keys = keys(10);
Txn txn = writeTxn(keys);
FullKeyRoute route = keys.toRoute(keys.get(0).toUnseekable());
@@ -71,8 +73,8 @@ public class CoordinateTest
private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable
{
- TxnId txnId = node.nextTxnId();
- txnId = new TxnId(txnId.epoch, txnId.real + clock, 0, txnId.node);
+ TxnId txnId = node.nextTxnId(Write, Key);
+ txnId = new TxnId(txnId.epoch(), txnId.hlc() + clock, Write, Key,
txnId.node);
Txn txn = writeTxn(keys);
Result result = Coordinate.coordinate(node, txnId, txn,
node.computeRoute(txnId, txn.keys())).get();
Assertions.assertEquals(MockStore.RESULT, result);
@@ -135,7 +137,7 @@ public class CoordinateTest
Node node = cluster.get(1);
Assertions.assertNotNull(node);
- TxnId txnId = node.nextTxnId();
+ TxnId txnId = node.nextTxnId(Write, Key);
Keys oneKey = keys(10);
Keys twoKeys = keys(10, 20);
Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey),
MockStore.QUERY, MockStore.update(twoKeys));
diff --git
a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index efba8ef..dbde032 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -18,14 +18,11 @@
package accord.coordinate;
-import accord.impl.mock.EpochSync;
import accord.impl.mock.MockCluster;
import accord.impl.mock.MockConfigurationService;
-import accord.impl.mock.RecordingMessageSink;
import accord.local.Command;
import accord.local.Node;
import accord.local.Status;
-import accord.messages.Accept;
import accord.primitives.Range;
import accord.topology.Topology;
import accord.primitives.Keys;
@@ -35,25 +32,15 @@ import accord.utils.EpochFunction;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.stream.Collectors;
-
import static accord.Utils.*;
import static accord.impl.IntKey.keys;
import static accord.impl.IntKey.range;
import static accord.local.PreLoadContext.empty;
+import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Txn.Kind.Write;
public class TopologyChangeTest
{
- private static TxnId coordinate(Node node, Keys keys) throws Throwable
- {
- TxnId txnId = node.nextTxnId();
- Txn txn = writeTxn(keys);
- node.coordinate(txnId, txn).get();
- return txnId;
- }
-
@Test
void disjointElectorate() throws Throwable
{
@@ -72,7 +59,7 @@ public class TopologyChangeTest
.build())
{
Node node1 = cluster.get(1);
- TxnId txnId1 = node1.nextTxnId();
+ TxnId txnId1 = node1.nextTxnId(Write, Key);
Txn txn1 = writeTxn(keys);
node1.coordinate(txnId1, txn1).get();
node1.commandStores().forEach(empty(), keys, 1, 1, commands -> {
@@ -83,7 +70,7 @@ public class TopologyChangeTest
cluster.configServices(4, 5, 6).forEach(config ->
config.reportTopology(topology2));
Node node4 = cluster.get(4);
- TxnId txnId2 = node4.nextTxnId();
+ TxnId txnId2 = node4.nextTxnId(Write, Key);
Txn txn2 = writeTxn(keys);
node4.coordinate(txnId2, txn2).get();
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java
b/accord-core/src/test/java/accord/impl/TestAgent.java
index b417dbc..53ff4f6 100644
--- a/accord-core/src/test/java/accord/impl/TestAgent.java
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -56,6 +56,6 @@ public class TestAgent implements Agent
@Override
public boolean isExpired(TxnId initiated, long now)
{
- return TimeUnit.SECONDS.convert(now - initiated.real,
TimeUnit.MICROSECONDS) >= 10;
+ return TimeUnit.SECONDS.convert(now - initiated.hlc(),
TimeUnit.MICROSECONDS) >= 10;
}
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 158c42b..2d6bb1f 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -70,6 +70,6 @@ public class ListAgent implements Agent
@Override
public boolean isExpired(TxnId initiated, long now)
{
- return now - initiated.real >= timeout;
+ return now - initiated.hlc() >= timeout;
}
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index fa8a29e..cdef9b0 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -34,7 +34,6 @@ import accord.messages.CheckStatus.IncludeInfo;
import accord.messages.MessageType;
import accord.messages.ReplyContext;
import accord.primitives.RoutingKeys;
-import accord.primitives.RoutingKeys;
import accord.primitives.Txn;
import accord.messages.Request;
import accord.primitives.TxnId;
@@ -51,7 +50,7 @@ public class ListRequest implements Request
int count = 0;
protected CheckOnResult(Node node, TxnId txnId, RoutingKey homeKey,
BiConsumer<Outcome, Throwable> callback)
{
- super(node, txnId, RoutingKeys.of(homeKey), txnId.epoch,
IncludeInfo.All);
+ super(node, txnId, RoutingKeys.of(homeKey), txnId.epoch(),
IncludeInfo.All);
this.callback = callback;
}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 7fe7aed..520d359 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -26,6 +26,8 @@ import accord.local.Node;
import accord.local.Node.Id;
import accord.local.ShardDistributor;
import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.Txn;
import accord.utils.EpochFunction;
import accord.utils.ThreadPoolScheduler;
import accord.primitives.TxnId;
@@ -45,6 +47,8 @@ import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import static accord.Utils.*;
+import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Txn.Kind.Write;
public class MockCluster implements Network, AutoCloseable, Iterable<Node>
{
@@ -382,7 +386,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
public TxnId idForNode(long epoch, Id id)
{
- return new TxnId(epoch, now.get(), 0, id);
+ return new TxnId(epoch, now.get(), Write, Key, id);
}
public TxnId idForNode(long epoch, int id)
diff --git a/accord-core/src/test/java/accord/local/CommandTest.java
b/accord-core/src/test/java/accord/local/CommandTest.java
index a666294..145c094 100644
--- a/accord-core/src/test/java/accord/local/CommandTest.java
+++ b/accord-core/src/test/java/accord/local/CommandTest.java
@@ -47,6 +47,8 @@ import java.util.function.Consumer;
import static accord.Utils.id;
import static accord.Utils.writeTxn;
import static accord.impl.InMemoryCommandStore.inMemory;
+import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Txn.Kind.Write;
public class CommandTest
{
@@ -166,7 +168,7 @@ public class CommandTest
CommandStoreSupport support = new CommandStoreSupport();
Node node = createNode(ID1, support);
CommandStore commands = node.unsafeByIndex(0);
- TxnId txnId = node.nextTxnId();
+ TxnId txnId = node.nextTxnId(Write, Key);
((MockCluster.Clock)node.unsafeGetNowSupplier()).increment(10);
Txn txn = writeTxn(Keys.of(KEY));
@@ -176,7 +178,7 @@ public class CommandTest
setTopologyEpoch(support.local, 2);
((TestableConfigurationService)node.configService()).reportTopology(support.local.get().withEpoch(2));
- Timestamp expectedTimestamp = new Timestamp(2, 110, 0, ID1);
+ Timestamp expectedTimestamp = Timestamp.fromValues(2, 110, ID1);
commands.execute(null, (Consumer<? super SafeCommandStore>) store ->
command.preaccept(store, txn.slice(FULL_RANGES, true), ROUTE,
HOME_KEY)).syncUninterruptibly();
Assertions.assertEquals(Status.PreAccepted, command.status());
Assertions.assertEquals(expectedTimestamp, command.executeAt());
diff --git a/accord-core/src/test/java/accord/local/NodeTest.java
b/accord-core/src/test/java/accord/local/NodeTest.java
index 4303b93..8d02d30 100644
--- a/accord-core/src/test/java/accord/local/NodeTest.java
+++ b/accord-core/src/test/java/accord/local/NodeTest.java
@@ -26,9 +26,9 @@ import org.junit.jupiter.api.Test;
public class NodeTest
{
- private static Timestamp ts(long epoch, long real, int logical, int node)
+ private static Timestamp ts(long epoch, long hlc, int node)
{
- return new Timestamp(epoch, real, logical, new Node.Id(node));
+ return Timestamp.fromValues(epoch, hlc, new Node.Id(node));
}
@Test
@@ -43,13 +43,14 @@ public class NodeTest
Timestamp timestamp1 = node.uniqueNow();
Timestamp timestamp2 = node.uniqueNow();
+ clock.increment();
+ clock.increment();
clock.increment();
Timestamp timestamp3 = node.uniqueNow();
- Assertions.assertEquals(ts(1, 101, 0, 1), timestamp1);
- Assertions.assertEquals(ts(1, 101, 1, 1), timestamp2);
-
- Assertions.assertEquals(ts(1, 102, 0, 1), timestamp3);
+ Assertions.assertEquals(ts(1, 101, 1), timestamp1);
+ Assertions.assertEquals(ts(1, 102, 1), timestamp2);
+ Assertions.assertEquals(ts(1, 104, 1), timestamp3);
}
}
@@ -64,11 +65,11 @@ public class NodeTest
clock.increment();
Timestamp timestamp1 = node.uniqueNow();
- Assertions.assertEquals(ts(1, 101, 0, 1), timestamp1);
+ Assertions.assertEquals(ts(1, 101, 1), timestamp1);
configService.reportTopology(node.topology().current().withEpoch(2));
Timestamp timestamp2 = node.uniqueNow();
- Assertions.assertEquals(ts(2, 101, 1, 1), timestamp2);
+ Assertions.assertEquals(ts(2, 102, 1), timestamp2);
}
}
@@ -82,19 +83,15 @@ public class NodeTest
clock.increment();
Timestamp timestamp1 = node.uniqueNow();
- Assertions.assertEquals(ts(1, 101, 0, 1), timestamp1);
+ Assertions.assertEquals(ts(1, 101, 1), timestamp1);
- // atLeast equal to most recent ts, logical should increment
- Assertions.assertEquals(ts(1, 101, 1, 1),
+ // atLeast equal to most recent ts, should simply increment
+ Assertions.assertEquals(ts(1, 102, 1),
node.uniqueNow(timestamp1));
- // atLeast less than most recent ts
- Assertions.assertEquals(ts(1, 101, 2, 1),
- node.uniqueNow(ts(1, 99, 0, 1)));
-
// atLeast greater than most recent ts
- Assertions.assertEquals(ts(1, 110, 1, 1),
- node.uniqueNow(ts(1, 110, 0, 2)));
+ Assertions.assertEquals(ts(1, 111, 1),
+ node.uniqueNow(ts(1, 110, 2)));
}
}
}
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 2899ef6..afba33d 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -40,11 +40,13 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Random;
-import static accord.Utils.id;
-import static accord.Utils.writeTxn;
+import static accord.Utils.*;
import static accord.impl.InMemoryCommandStore.inMemory;
+import static accord.impl.IntKey.range;
import static accord.impl.IntKey.routing;
import static accord.impl.mock.MockCluster.configService;
+import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Txn.Kind.Write;
import static accord.utils.Utils.listOf;
public class PreAcceptTest
@@ -79,7 +81,7 @@ public class PreAcceptTest
private static PreAccept preAccept(TxnId txnId, Txn txn, RoutingKey
homeKey)
{
FullRoute<?> route = txn.keys().toRoute(homeKey);
- return PreAccept.SerializerSupport.create(txnId,
route.slice(FULL_RANGE), txnId.epoch, txnId.epoch, false, txnId.epoch,
txn.slice(FULL_RANGE, true), route);
+ return PreAccept.SerializerSupport.create(txnId,
route.slice(FULL_RANGE), txnId.epoch(), txnId.epoch(), false, txnId.epoch(),
txn.slice(FULL_RANGE, true), route);
}
@Test
@@ -159,15 +161,15 @@ public class PreAcceptTest
messageSink.clearHistory();
Raw key2 = IntKey.key(11);
Keys keys = Keys.of(key1, key2);
- TxnId txnId2 = new TxnId(1, 50, 0, ID3);
+ TxnId txnId2 = new TxnId(1, 50, Write, Key, ID3);
PreAccept preAccept2 = preAccept(txnId2, writeTxn(keys),
key2.toUnseekable());
clock.increment(10);
preAccept2.process(node, ID3, REPLY_CONTEXT);
messageSink.assertHistorySizes(0, 1);
Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
- PartialDeps expectedDeps = PartialDeps.NONE;
- Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, new
TxnId(1, 110, 0, ID1), expectedDeps),
+ PartialDeps expectedDeps = Deps.NONE.slice(ranges(range(0, 12)));
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2,
Timestamp.fromValues(1, 110, ID1), expectedDeps),
messageSink.responses.get(0).payload);
}
finally
@@ -191,7 +193,7 @@ public class PreAcceptTest
try
{
Keys keys = Keys.of(key);
- TxnId txnId = new TxnId(1, 110, 0, ID2);
+ TxnId txnId = new TxnId(1, 110, Write, Key, ID2);
PreAccept preAccept = preAccept(txnId, writeTxn(keys),
key.toUnseekable());
preAccept.process(node, ID2, REPLY_CONTEXT);
@@ -235,7 +237,7 @@ public class PreAcceptTest
messageSink.assertHistorySizes(0, 1);
Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
- Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, new
TxnId(2, 110, 0, ID1), PartialDeps.NONE),
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId,
Timestamp.fromValues(2, 110, ID1), Deps.NONE.slice(ranges(range(0, 12)))),
messageSink.responses.get(0).payload);
}
finally
diff --git a/accord-core/src/test/java/accord/txn/DepsTest.java
b/accord-core/src/test/java/accord/txn/DepsTest.java
index 9bd8d52..9c480ad 100644
--- a/accord-core/src/test/java/accord/txn/DepsTest.java
+++ b/accord-core/src/test/java/accord/txn/DepsTest.java
@@ -65,23 +65,23 @@ public class DepsTest
@Test
public void testRandom()
{
- testOneRandom(seed(), 1000, 3, 50, 10, 4, 100, 10, 200, 1000);
- testOneRandom(seed(), 1000, 3, 50, 10, 4, 10, 2, 200, 1000);
- testOneRandom(seed(), 100, 3, 50, 10, 4, 10, 2, 200, 100);
+ testOneRandom(seed(), 1000, 3, 500, 4, 100, 10, 200, 1000);
+ testOneRandom(seed(), 1000, 3, 500, 4, 10, 2, 200, 1000);
+ testOneRandom(seed(), 100, 3, 500, 4, 10, 2, 200, 100);
}
@Test
public void testMerge()
{
- testMerge(seed(), 100, 3, 50, 10, 4, 10, 5, 200, 100, 10);
- testMerge(seed(), 1000, 3, 50, 10, 4, 100, 10, 200, 1000, 10);
+ testMerge(seed(), 100, 3, 500, 4, 10, 5, 200, 100, 10);
+ testMerge(seed(), 1000, 3, 500, 4, 100, 10, 200, 1000, 10);
}
- private static void testMerge(long seed, int uniqueTxnIdsRange, int
epochRange, int realRange, int logicalRange, int nodeRange,
+ private static void testMerge(long seed, int uniqueTxnIdsRange, int
epochRange, int hlcRange, int nodeRange,
int uniqueKeysRange, int emptyKeysRange, int
keyRange, int totalCountRange, int mergeCountRange)
{
Random random = random(seed);
- Supplier<Deps> supplier = supplier(random, uniqueTxnIdsRange,
epochRange, realRange, logicalRange, nodeRange,
+ Supplier<Deps> supplier = supplier(random, uniqueTxnIdsRange,
epochRange, hlcRange, 0, nodeRange,
uniqueKeysRange, emptyKeysRange,
keyRange, totalCountRange);
int count = 1 + random.nextInt(mergeCountRange);
List<Deps> deps = new ArrayList<>(count);
@@ -93,14 +93,14 @@ public class DepsTest
@Test
public void testWith()
{
- testWith(seed(), 1000, 3, 50, 10, 4, 100, 10, 200, 1000, 10);
+ testWith(seed(), 1000, 3, 500, 4, 100, 10, 200, 1000, 10);
}
- private static void testWith(long seed, int uniqueTxnIdsRange, int
epochRange, int realRange, int logicalRange, int nodeRange,
+ private static void testWith(long seed, int uniqueTxnIdsRange, int
epochRange, int hlcRange, int nodeRange,
int uniqueKeysRange, int emptyKeysRange, int
keyRange, int totalCountRange, int mergeCountRange)
{
Random random = random(seed);
- Supplier<Deps> supplier = supplier(random, uniqueTxnIdsRange,
epochRange, realRange, logicalRange, nodeRange,
+ Supplier<Deps> supplier = supplier(random, uniqueTxnIdsRange,
epochRange, hlcRange, 0, nodeRange,
uniqueKeysRange, emptyKeysRange,
keyRange, totalCountRange);
Deps cur = supplier.get();
int count = 1 + random.nextInt(mergeCountRange);
@@ -311,22 +311,21 @@ public class DepsTest
static Deps generate(Gen.Random random)
{
int epochRange = 3;
- int realRange = 50;
- int logicalRange = 10;
+ int hlcRange = 500;
double uniqueTxnIdsPercentage = 0.66D;
- int uniqueTxnIds = random.nextPositive((int) ((realRange *
logicalRange * epochRange) * uniqueTxnIdsPercentage));
+ int uniqueTxnIds = random.nextPositive((int) ((hlcRange *
epochRange) * uniqueTxnIdsPercentage));
int nodeRange = random.nextPositive(4);
int uniqueKeys = random.nextInt(2, 200);
int emptyKeys = random.nextInt(0, 10);
int keyRange = random.nextInt(uniqueKeys + emptyKeys, 400);
int totalCount = random.nextPositive(1000);
- Deps deps = generate(random, uniqueTxnIds, epochRange, realRange,
logicalRange, nodeRange, uniqueKeys, emptyKeys, keyRange, totalCount);
+ Deps deps = generate(random, uniqueTxnIds, epochRange, hlcRange,
0, nodeRange, uniqueKeys, emptyKeys, keyRange, totalCount);
deps.testSimpleEquality();
return deps;
}
- static Deps generate(Random random, int uniqueTxnIds, int epochRange,
int realRange, int logicalRange, int nodeRange,
+ static Deps generate(Random random, int uniqueTxnIds, int epochRange,
int hlcRange, int flagsRange, int nodeRange,
int uniqueKeys, int emptyKeys, int keyRange, int
totalCount)
{
// populateKeys is a subset of keys
@@ -344,7 +343,7 @@ public class DepsTest
List<TxnId> txnIds; {
TreeSet<TxnId> tmp = new TreeSet<>();
while (tmp.size() < uniqueTxnIds)
- tmp.add(new TxnId(random.nextInt(epochRange),
random.nextInt(realRange), random.nextInt(logicalRange), new
Id(random.nextInt(nodeRange))));
+ tmp.add(TxnId.fromValues(random.nextInt(epochRange),
random.nextInt(hlcRange), flagsRange == 0 ? 0 : random.nextInt(flagsRange), new
Id(random.nextInt(nodeRange))));
txnIds = new ArrayList<>(tmp);
}
@@ -469,17 +468,17 @@ public class DepsTest
return Ranges.ofSortedAndDeoverlapped(ranges);
}
- private static void testOneRandom(long seed, int uniqueTxnIds, int
epochRange, int realRange, int logicalRange, int nodeRange,
+ private static void testOneRandom(long seed, int uniqueTxnIds, int
epochRange, int hlcRange, int nodeRange,
int uniqueKeys, int emptyKeys, int
keyRange, int totalCountRange)
{
Random random = random(seed);
int totalCount = 1 + random.nextInt(totalCountRange - 1);
testOneDeps(random,
- DepsTest.Deps.generate(random, uniqueTxnIds, epochRange,
realRange, logicalRange, nodeRange, uniqueKeys, emptyKeys, keyRange,
totalCount),
+ DepsTest.Deps.generate(random, uniqueTxnIds, epochRange,
hlcRange, 0, nodeRange, uniqueKeys, emptyKeys, keyRange, totalCount),
keyRange);
}
- private static Supplier<Deps> supplier(Random random, int
uniqueTxnIdsRange, int epochRange, int realRange, int logicalRange, int
nodeRange,
+ private static Supplier<Deps> supplier(Random random, int
uniqueTxnIdsRange, int epochRange, int hlcRange, int flagRange, int nodeRange,
int uniqueKeysRange, int
emptyKeysRange, int keyRange, int totalCountRange)
{
return () -> {
@@ -491,7 +490,7 @@ public class DepsTest
int emptyKeys = 1 + random.nextInt(emptyKeysRange - 1);
int totalCount = random.nextInt(Math.min(totalCountRange,
uniqueKeys * uniqueTxnIds));
return DepsTest.Deps.generate(random, uniqueTxnIds,
- epochRange, realRange,
logicalRange, nodeRange,
+ epochRange, hlcRange,
flagRange, nodeRange,
uniqueKeys, emptyKeys,
keyRange, totalCount);
};
}
@@ -574,10 +573,10 @@ public class DepsTest
public static void main(String[] args)
{
for (long seed = 0 ; seed < 10000 ; ++seed)
- testMerge(seed, 100, 3, 50, 10, 4, 4, 2, 100, 10, 4);
+ testMerge(seed, 100, 3, 50, 4, 4, 2, 100, 10, 4);
for (long seed = 0 ; seed < 10000 ; ++seed)
- testMerge(seed, 1000, 3, 50, 10, 4, 20, 5, 200, 100, 4);
+ testMerge(seed, 1000, 3, 50, 4, 20, 5, 200, 100, 4);
}
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
index e374f01..98a8345 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
@@ -113,13 +113,13 @@ public class Json
in.nextNull();
return null;
}
- return readTimestamp(in, Timestamp::new);
+ return readTimestamp(in, Timestamp::fromBits);
}
};
private interface TimestampFactory<T>
{
- T create(long epoch, long real, int logical, Id node);
+ T create(long msb, long lsb, Id node);
}
private static <T> T readTimestamp(JsonReader in, TimestampFactory<T>
factory) throws IOException
@@ -130,12 +130,11 @@ public class Json
return null;
}
in.beginArray();
- long epoch = in.nextLong();
- long real = in.nextLong();
- int logical = in.nextInt();
+ long msb = in.nextLong();
+ long lsb = in.nextLong();
Id node = ID_ADAPTER.read(in);
in.endArray();
- return factory.create(epoch, real, logical, node);
+ return factory.create(msb, lsb, node);
}
private static void writeTimestamp(JsonWriter out, Timestamp timestamp)
throws IOException
@@ -146,9 +145,8 @@ public class Json
return;
}
out.beginArray();
- out.value(timestamp.epoch);
- out.value(timestamp.real);
- out.value(timestamp.logical);
+ out.value(timestamp.msb);
+ out.value(timestamp.lsb);
ID_ADAPTER.write(out, timestamp.node);
out.endArray();
}
@@ -164,7 +162,7 @@ public class Json
@Override
public TxnId read(JsonReader in) throws IOException
{
- return readTimestamp(in, TxnId::new);
+ return readTimestamp(in, TxnId::fromBits);
}
};
@@ -179,7 +177,7 @@ public class Json
@Override
public Ballot read(JsonReader in) throws IOException
{
- return readTimestamp(in, Ballot::new);
+ return readTimestamp(in, Ballot::fromBits);
}
};
diff --git
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index 729fa89..598c632 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -60,6 +60,6 @@ public class MaelstromAgent implements Agent
@Override
public boolean isExpired(TxnId initiated, long now)
{
- return TimeUnit.SECONDS.convert(now - initiated.real,
TimeUnit.MICROSECONDS) >= 10;
+ return TimeUnit.SECONDS.convert(now - initiated.hlc(),
TimeUnit.MICROSECONDS) >= 10;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]