This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new f3782e2a More follow-up to CASSANDRA-19967 and CASSANDRA-19869
f3782e2a is described below
commit f3782e2a98004843cc3384a6983478c1128a1d6a
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 2 12:42:11 2024 +0100
More follow-up to CASSANDRA-19967 and CASSANDRA-19869
---
.../java/accord/impl/InMemoryCommandStore.java | 4 +-
.../main/java/accord/impl/TimestampsForKeys.java | 9 +-
.../src/main/java/accord/local/Cleanup.java | 11 +-
.../src/main/java/accord/local/Command.java | 2 +-
.../src/main/java/accord/local/CommandStore.java | 175 +--------------------
.../src/main/java/accord/local/CommandStores.java | 5 -
.../src/main/java/accord/local/Commands.java | 20 +--
.../main/java/accord/local/RedundantBefore.java | 146 +++++++++++++++++
.../main/java/accord/local/SafeCommandStore.java | 34 +++-
.../main/java/accord/local/StoreParticipants.java | 2 +-
.../java/accord/local/cfk/SafeCommandsForKey.java | 2 +-
.../src/main/java/accord/messages/PreAccept.java | 2 +-
.../src/main/java/accord/messages/Propagate.java | 8 +-
.../java/accord/messages/QueryDurableBefore.java | 2 +-
.../src/main/java/accord/messages/ReadData.java | 2 +-
.../java/accord/messages/SetGloballyDurable.java | 2 +-
.../src/test/java/accord/impl/basic/Journal.java | 3 +-
.../src/test/java/accord/impl/list/ListRead.java | 2 +-
18 files changed, 222 insertions(+), 209 deletions(-)
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 915ecec7..deee1947 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -397,7 +397,7 @@ public abstract class InMemoryCommandStore extends
CommandStore
boolean done = command.hasBeen(Truncated);
if (!done)
{
- if (redundantBefore().status(txnId, command.route()) ==
RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
+ if (unsafeGetRedundantBefore().status(txnId,
command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
return;
Route<?> route = command.route().slice(allRanges);
@@ -759,7 +759,7 @@ public abstract class InMemoryCommandStore extends
CommandStore
return;
Ranges slice = ranges(txnId, updated.executeAtOrTxnId());
- slice = commandStore.redundantBefore().removeShardRedundant(txnId,
updated.executeAtOrTxnId(), slice);
+ slice =
commandStore.unsafeGetRedundantBefore().removeShardRedundant(txnId,
updated.executeAtOrTxnId(), slice);
commandStore.rangeCommands.computeIfAbsent(txnId, ignore -> new
RangeCommand(commandStore.commands.get(txnId)))
.update(((AbstractRanges)updated.participants().touches()).toRanges().slice(slice,
Minimal));
}
diff --git a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
index bd0e0ef7..baea052c 100644
--- a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
+++ b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
@@ -21,6 +21,7 @@ package accord.impl;
import accord.api.RoutingKey;
import accord.api.VisibleForImplementation;
import accord.local.CommandStore;
+import accord.local.SafeCommandStore;
import accord.primitives.RoutingKeys;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -38,7 +39,7 @@ public class TimestampsForKeys
private TimestampsForKeys() {}
- public static TimestampsForKey updateLastExecutionTimestamps(CommandStore
commandStore, SafeTimestampsForKey tfk, TxnId txnId, Timestamp executeAt,
boolean isForWriteTxn)
+ public static TimestampsForKey
updateLastExecutionTimestamps(SafeCommandStore safeStore, SafeTimestampsForKey
tfk, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn)
{
TimestampsForKey current = tfk.current();
@@ -46,7 +47,7 @@ public class TimestampsForKeys
if (executeAt.compareTo(lastWrite) < 0)
{
- if
(commandStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId,
current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY)
+ if
(safeStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId,
current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY)
return current;
throw illegalState("%s is less than the most recent write
timestamp %s", executeAt, lastWrite);
}
@@ -59,7 +60,7 @@ public class TimestampsForKeys
if (cmp < 0)
{
- if
(!commandStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable()))
+ if
(!safeStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable()))
return current;
throw illegalState("%s is less than the most recent executed
timestamp %s", executeAt, lastExecuted);
}
@@ -83,6 +84,6 @@ public class TimestampsForKeys
@VisibleForImplementation
public static <D> TimestampsForKey
updateLastExecutionTimestamps(AbstractSafeCommandStore<?,?,?> safeStore,
RoutingKey key, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn)
{
- return updateLastExecutionTimestamps(safeStore.commandStore(),
safeStore.timestampsForKey(key), txnId, executeAt, isForWriteTxn);
+ return updateLastExecutionTimestamps(safeStore,
safeStore.timestampsForKey(key), txnId, executeAt, isForWriteTxn);
}
}
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java
b/accord-core/src/main/java/accord/local/Cleanup.java
index 78dc341c..b07c8a09 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -76,18 +76,19 @@ public enum Cleanup
public static Cleanup shouldCleanup(SafeCommandStore safeStore, Command
command)
{
- return shouldCleanup(safeStore.commandStore(), command,
command.participants());
+ return shouldCleanup(safeStore, command, command.participants());
}
public static Cleanup shouldCleanup(SafeCommandStore safeStore, Command
command, @Nonnull StoreParticipants participants)
{
- return shouldCleanup(safeStore.commandStore(), command, participants);
+ return shouldCleanup(command.txnId(), command.saveStatus(),
command.durability(), participants,
+ safeStore.redundantBefore(),
safeStore.durableBefore());
}
- public static Cleanup shouldCleanup(CommandStore commandStore, Command
command, @Nonnull StoreParticipants participants)
+ public static Cleanup shouldCleanup(Command command, RedundantBefore
redundantBefore, DurableBefore durableBefore)
{
- return shouldCleanup(command.txnId(), command.saveStatus(),
command.durability(), participants,
- commandStore.redundantBefore(),
commandStore.durableBefore());
+ return shouldCleanup(command.txnId(), command.saveStatus(),
command.durability(), command.participants(),
+ redundantBefore, durableBefore);
}
public static Cleanup shouldCleanup(TxnId txnId, SaveStatus status,
Durability durability, StoreParticipants participants, RedundantBefore
redundantBefore, DurableBefore durableBefore)
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index c3e2c851..8fad6318 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -1368,7 +1368,7 @@ public abstract class Command implements CommonAttributes
long maxEpoch = prevEpoch;
long epoch = rangesForEpoch.epochs[i];
Ranges ranges = rangesForEpoch.ranges[i];
- ranges =
safeStore.commandStore().redundantBefore().removePreBootstrap(txnId, ranges);
+ ranges =
safeStore.redundantBefore().removePreBootstrap(txnId, ranges);
if (!ranges.isEmpty())
{
Unseekables<?> executionParticipants =
participants.route.slice(ranges, Minimal);
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index 44d3ca56..d2125ddb 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -22,18 +22,13 @@ import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.DataStore;
import accord.coordinate.CollectCalculatedDeps;
-import accord.local.Command.WaitingOn;
import javax.annotation.Nullable;
import accord.api.Agent;
import accord.local.CommandStores.RangesForEpoch;
-import accord.primitives.KeyDeps;
import accord.primitives.Participants;
-import accord.primitives.Range;
import accord.primitives.Routables;
-import accord.primitives.RoutingKeys;
-import accord.primitives.Status;
import accord.primitives.Unseekables;
import accord.utils.async.AsyncChain;
@@ -44,7 +39,6 @@ import accord.utils.async.AsyncResult;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
@@ -62,12 +56,10 @@ import org.slf4j.LoggerFactory;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
-import accord.primitives.RangeDeps;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.async.AsyncResults;
-import org.agrona.collections.Int2ObjectHashMap;
import static accord.api.ConfigurationService.EpochReady.DONE;
import static accord.local.KeyHistory.COMMANDS;
@@ -637,180 +629,29 @@ public abstract class CommandStore implements
AgentExecutor
};
}
- public final Ranges safeToReadAt(Timestamp at)
+ public final boolean isRejectedIfNotPreAccepted(TxnId txnId,
Unseekables<?> participants)
{
- return safeToRead.lowerEntry(at).getValue();
- }
+ if (rejectBefore == null)
+ return false;
- // TODO (desired): Commands.durability() can use this to upgrade to
Majority without further info
- public final Status.Durability globalDurability(TxnId txnId)
- {
- return durableBefore.min(txnId);
+ return rejectBefore.rejects(txnId, participants);
}
- public final RedundantBefore redundantBefore()
+ public final RedundantBefore unsafeGetRedundantBefore()
{
return redundantBefore;
}
- public DurableBefore durableBefore()
+ public DurableBefore unsafeGetDurableBefore()
{
return durableBefore;
}
@VisibleForTesting
- public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return
bootstrapBeganAt; }
+ public final NavigableMap<TxnId, Ranges> unsafeGetBootstrapBeganAt() {
return bootstrapBeganAt; }
@VisibleForTesting
- public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; }
-
- public final boolean isRejectedIfNotPreAccepted(TxnId txnId,
Unseekables<?> participants)
- {
- if (rejectBefore == null)
- return false;
-
- return rejectBefore.rejects(txnId, participants);
- }
-
- public final void removeRedundantDependencies(Unseekables<?> participants,
WaitingOn.Update builder)
- {
- // Note: we do not need to track the bootstraps we implicitly depend
upon, because we will not serve any read requests until this has completed
- // and since we are a timestamp store, and we write only this will
sort itself out naturally
- // TODO (required): make sure we have no races on HLC around SyncPoint
else this resolution may not work (we need to know the micros equivalent
timestamp of the snapshot)
- class KeyState
- {
- Int2ObjectHashMap<RoutingKeys> partiallyBootstrapping;
-
- /**
- * Are the participating ranges for the txn fully covered by
bootstrapping ranges for this command store
- */
- boolean isFullyBootstrapping(WaitingOn.Update builder, Range
range, int txnIdx)
- {
- if (builder.directKeyDeps.foldEachKey(txnIdx, range, true,
(r0, k, p) -> p && r0.contains(k)))
- return true;
-
- if (partiallyBootstrapping == null)
- partiallyBootstrapping = new Int2ObjectHashMap<>();
- RoutingKeys prev = partiallyBootstrapping.get(txnIdx);
- RoutingKeys remaining = prev;
- if (remaining == null) remaining =
builder.directKeyDeps.participatingKeys(txnIdx);
- else Invariants.checkState(!remaining.isEmpty());
- remaining = remaining.without(range);
- if (prev == null) Invariants.checkState(!remaining.isEmpty());
- partiallyBootstrapping.put(txnIdx, remaining);
- return remaining.isEmpty();
- }
- }
-
- KeyDeps directKeyDeps = builder.directKeyDeps;
- if (!directKeyDeps.isEmpty())
- {
- redundantBefore().foldl(directKeyDeps.keys(), (e, s, d, b) -> {
- // TODO (desired, efficiency): foldlInt so we can track the
lower rangeidx bound and not revisit unnecessarily
- // find the txnIdx below which we are known to be fully
redundant locally due to having been applied or invalidated
- int bootstrapIdx = d.txnIds().find(e.bootstrappedAt);
- if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx;
- int appliedIdx =
d.txnIds().find(e.locallyAppliedOrInvalidatedBefore);
- if (appliedIdx < 0) appliedIdx = -1 - appliedIdx;
-
- // remove intersecting transactions with known redundant txnId
- // note that we must exclude all transactions that are
pre-bootstrap, and perform the more complicated dance below,
- // as these transactions may be only partially applied, and we
may need to wait for them on another key.
- if (appliedIdx > bootstrapIdx)
- {
- d.forEach(e.range, bootstrapIdx, appliedIdx, b, s, (b0,
s0, txnIdx) -> {
- b0.removeWaitingOnDirectKeyTxnId(txnIdx);
- });
- }
-
- if (bootstrapIdx > 0)
- {
- d.forEach(e.range, 0, bootstrapIdx, b, s, e.range, (b0,
s0, r, txnIdx) -> {
- if (b0.isWaitingOnDirectKeyTxnIdx(txnIdx) &&
s0.isFullyBootstrapping(b0, r, txnIdx))
- b0.removeWaitingOnDirectKeyTxnId(txnIdx);
- });
- }
- return s;
- }, new KeyState(), directKeyDeps, builder, ignore -> false);
- }
-
- /**
- * If we have to handle bootstrapping ranges for range transactions,
these may only partially cover the
- * transaction, in which case we should not remove the transaction as
a dependency. But if it is fully
- * covered by bootstrapping ranges then we *must* remove it as a
dependency.
- */
- class RangeState
- {
- Range range;
- int bootstrapIdx, appliedIdx;
- Map<Integer, Ranges> partiallyBootstrapping;
-
- /**
- * Are the participating ranges for the txn fully covered by
bootstrapping ranges for this command store
- */
- boolean isFullyBootstrapping(int rangeTxnIdx)
- {
- // if all deps for the txnIdx are contained in the range,
don't inflate any shared object state
- if (builder.directRangeDeps.foldEachRange(rangeTxnIdx, range,
true, (r1, r2, p) -> p && r1.contains(r2)))
- return true;
-
- if (partiallyBootstrapping == null)
- partiallyBootstrapping = new HashMap<>();
- Ranges prev = partiallyBootstrapping.get(rangeTxnIdx);
- Ranges remaining = prev;
- if (remaining == null) remaining =
builder.directRangeDeps.ranges(rangeTxnIdx);
- else Invariants.checkState(!remaining.isEmpty());
- remaining = remaining.without(Ranges.of(range));
- if (prev == null) Invariants.checkState(!remaining.isEmpty());
- partiallyBootstrapping.put(rangeTxnIdx, remaining);
- return remaining.isEmpty();
- }
- }
-
- RangeDeps rangeDeps = builder.directRangeDeps;
- // TODO (required, consider): slice to only those ranges we own, maybe
don't even construct rangeDeps.covering()
- redundantBefore().foldl(participants, (e, s, d, b) -> {
- int bootstrapIdx = d.txnIds().find(e.bootstrappedAt);
- if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx;
- s.bootstrapIdx = bootstrapIdx;
-
- int appliedIdx =
d.txnIds().find(e.locallyAppliedOrInvalidatedBefore);
- if (appliedIdx < 0) appliedIdx = -1 - appliedIdx;
- s.appliedIdx = appliedIdx;
-
- // remove intersecting transactions with known redundant txnId
- if (appliedIdx > bootstrapIdx)
- {
- // TODO (desired):
- // TODO (desired): move the bounds check into forEach,
matching structure used for keys
- d.forEach(e.range, b, s, (b0, s0, txnIdx) -> {
- if (txnIdx >= s0.bootstrapIdx && txnIdx < s0.appliedIdx)
- b0.removeWaitingOnDirectRangeTxnId(txnIdx);
- });
- }
-
- if (bootstrapIdx > 0)
- {
- // if we have any ranges where bootstrap is involved, we have
to do a more complicated dance since
- // this may imply only partial redundancy (we may still depend
on the transaction for some other range)
- s.range = e.range;
- // TODO (desired): move the bounds check into forEach,
matching structure used for keys
- d.forEach(e.range, b, s, (b0, s0, txnIdx) -> {
- if (txnIdx < s0.bootstrapIdx &&
b0.isWaitingOnDirectRangeTxnIdx(txnIdx) && s0.isFullyBootstrapping(txnIdx))
- b0.removeWaitingOnDirectRangeTxnId(txnIdx);
- });
- }
- return s;
- }, new RangeState(), rangeDeps, builder, ignore -> false);
- }
-
- public final boolean hasLocallyRedundantDependencies(TxnId
minimumDependencyId, Timestamp executeAt, Participants<?>
participantsOfWaitingTxn)
- {
- // TODO (required): consider race conditions when bootstrapping into
an active command store, that may have seen a higher txnId than this?
- // might benefit from maintaining a per-CommandStore largest TxnId
register to ensure we allocate a higher TxnId for our ExclSync,
- // or from using whatever summary records we have for the range,
once we maintain them
- return redundantBefore.status(minimumDependencyId,
participantsOfWaitingTxn).compareTo(RedundantStatus.PARTIALLY_PRE_BOOTSTRAP_OR_STALE)
>= 0;
- }
+ public NavigableMap<Timestamp, Ranges> unsafeGetSafeToRead() { return
safeToRead; }
final void markUnsafeToRead(Ranges ranges)
{
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java
b/accord-core/src/main/java/accord/local/CommandStores.java
index 53f619d5..2e366661 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -208,11 +208,6 @@ public abstract class CommandStores
return allAt(txnId);
}
- public @Nonnull Ranges unsafeToReadAt(Timestamp at)
- {
- return allAt(at).without(store.safeToReadAt(at));
- }
-
public @Nonnull Ranges allAt(Timestamp at)
{
return allAt(at.epoch());
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index a85ad650..76c74a86 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -634,10 +634,10 @@ public class Commands
protected static WaitingOn.Update updateWaitingOn(SafeCommandStore
safeStore, CommonAttributes waiting, Timestamp executeAt, WaitingOn.Update
update, Participants<?> participants)
{
- CommandStore commandStore = safeStore.commandStore();
+ RedundantBefore redundantBefore = safeStore.redundantBefore();
TxnId minWaitingOnTxnId = update.minWaitingOnTxnId();
- if (minWaitingOnTxnId != null &&
commandStore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(),
executeAt, participants))
- safeStore.commandStore().removeRedundantDependencies(participants,
update);
+ if (minWaitingOnTxnId != null &&
redundantBefore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(),
executeAt, participants))
+ redundantBefore.removeRedundantDependencies(participants, update);
update.forEachWaitingOnId(safeStore, update, waiting, executeAt,
(store, upd, w, exec, i) -> {
SafeCommand dep = store.ifLoadedAndInitialised(upd.txnId(i));
@@ -859,7 +859,7 @@ public class Commands
{
TxnId txnId = command.txnId();
participants = command.participants().supplement(participants);
- RedundantStatus status =
safeStore.commandStore().redundantBefore().status(txnId, participants.owns());
+ RedundantStatus status = safeStore.redundantBefore().status(txnId,
participants.owns());
switch (status)
{
default: throw new AssertionError("Unhandled RedundantStatus: " +
status);
@@ -937,7 +937,7 @@ public class Commands
depSafe = safeStore.ifInitialised(loadDepId);
if (depSafe == null)
{
- RedundantStatus redundantStatus =
safeStore.commandStore().redundantBefore().status(waitingId,
waiting.partialDeps().participants(loadDepId));
+ RedundantStatus redundantStatus =
safeStore.redundantBefore().status(waitingId,
waiting.partialDeps().participants(loadDepId));
switch (redundantStatus)
{
default: throw new AssertionError("Unexpected
redundant status: " + redundantStatus);
@@ -1013,7 +1013,7 @@ public class Commands
// TODO (desired): slightly costly to invert a large
partialDeps collection
Participants<?> participants =
waiting.partialDeps().participants(dep.txnId());
participants =
waiting.participants().dependencyExecutesAtLeast(safeStore, participants,
waitingId, waiting.executeAt());
- RedundantStatus redundantStatus =
safeStore.commandStore().redundantBefore().status(dep.txnId(), participants);
+ RedundantStatus redundantStatus =
safeStore.redundantBefore().status(dep.txnId(), participants);
switch (redundantStatus)
{
default: throw new AssertionError("Unknown
redundant status: " + redundantStatus);
@@ -1101,13 +1101,13 @@ public class Commands
static Command removeRedundantDependencies(SafeCommandStore safeStore,
SafeCommand safeCommand, @Nullable TxnId redundant)
{
- CommandStore commandStore = safeStore.commandStore();
Command.Committed current = safeCommand.current().asCommitted();
+ RedundantBefore redundantBefore = safeStore.redundantBefore();
WaitingOn.Update update = new WaitingOn.Update(current.waitingOn);
TxnId minWaitingOnTxnId = update.minWaitingOnTxnId();
- if (minWaitingOnTxnId != null &&
commandStore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(),
current.executeAt(), current.participants().owns))
-
safeStore.commandStore().removeRedundantDependencies(current.participants().owns,
update);
+ if (minWaitingOnTxnId != null &&
redundantBefore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(),
current.executeAt(), current.participants().owns))
+
redundantBefore.removeRedundantDependencies(current.participants().owns,
update);
// if we are a range transaction, being redundant for this transaction
does not imply we are redundant for all transactions
if (redundant != null)
@@ -1248,7 +1248,7 @@ public class Commands
{
// TODO (required, later): in the event we are depending on a
stale key for an insert into a non-stale key, we cannot proceed and must mark
the new key stale
// I think today this is unsupported in practice, but must be
addressed before we improve efficiency of result handling
- Ranges staleRanges =
permitStaleMissing.commandStore().redundantBefore().staleRanges();
+ Ranges staleRanges =
permitStaleMissing.redundantBefore().staleRanges();
required = required.without(staleRanges);
return adding == null ? required.isEmpty() : covers.test(adding,
required);
}
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index a0d7ede8..41b84ae8 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -18,6 +18,8 @@
package accord.local;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -27,16 +29,20 @@ import accord.api.VisibleForImplementation;
import accord.primitives.AbstractRanges;
import accord.primitives.Deps;
import accord.primitives.EpochSupplier;
+import accord.primitives.KeyDeps;
import accord.primitives.Participants;
import accord.primitives.Range;
+import accord.primitives.RangeDeps;
import accord.primitives.Ranges;
import accord.primitives.Routables;
+import accord.primitives.RoutingKeys;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.utils.Invariants;
import accord.utils.ReducingIntervalMap;
import accord.utils.ReducingRangeMap;
+import org.agrona.collections.Int2ObjectHashMap;
import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY;
import static accord.local.RedundantBefore.PreBootstrapOrStale.POST_BOOTSTRAP;
@@ -675,4 +681,144 @@ public class RedundantBefore extends
ReducingRangeMap<RedundantBefore.Entry>
}
}
}
+
+ public final void removeRedundantDependencies(Unseekables<?> participants,
Command.WaitingOn.Update builder)
+ {
+ // Note: we do not need to track the bootstraps we implicitly depend
upon, because we will not serve any read requests until this has completed
+ // and since we are a timestamp store, and we write only this will
sort itself out naturally
+ // TODO (required): make sure we have no races on HLC around SyncPoint
else this resolution may not work (we need to know the micros equivalent
timestamp of the snapshot)
+ class KeyState
+ {
+ Int2ObjectHashMap<RoutingKeys> partiallyBootstrapping;
+
+ /**
+ * Are the participating ranges for the txn fully covered by
bootstrapping ranges for this command store
+ */
+ boolean isFullyBootstrapping(Command.WaitingOn.Update builder,
Range range, int txnIdx)
+ {
+ if (builder.directKeyDeps.foldEachKey(txnIdx, range, true,
(r0, k, p) -> p && r0.contains(k)))
+ return true;
+
+ if (partiallyBootstrapping == null)
+ partiallyBootstrapping = new Int2ObjectHashMap<>();
+ RoutingKeys prev = partiallyBootstrapping.get(txnIdx);
+ RoutingKeys remaining = prev;
+ if (remaining == null) remaining =
builder.directKeyDeps.participatingKeys(txnIdx);
+ else Invariants.checkState(!remaining.isEmpty());
+ remaining = remaining.without(range);
+ if (prev == null) Invariants.checkState(!remaining.isEmpty());
+ partiallyBootstrapping.put(txnIdx, remaining);
+ return remaining.isEmpty();
+ }
+ }
+
+ KeyDeps directKeyDeps = builder.directKeyDeps;
+ if (!directKeyDeps.isEmpty())
+ {
+ foldl(directKeyDeps.keys(), (e, s, d, b) -> {
+ // TODO (desired, efficiency): foldlInt so we can track the
lower rangeidx bound and not revisit unnecessarily
+ // find the txnIdx below which we are known to be fully
redundant locally due to having been applied or invalidated
+ int bootstrapIdx = d.txnIds().find(e.bootstrappedAt);
+ if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx;
+ int appliedIdx =
d.txnIds().find(e.locallyAppliedOrInvalidatedBefore);
+ if (appliedIdx < 0) appliedIdx = -1 - appliedIdx;
+
+ // remove intersecting transactions with known redundant txnId
+ // note that we must exclude all transactions that are
pre-bootstrap, and perform the more complicated dance below,
+ // as these transactions may be only partially applied, and we
may need to wait for them on another key.
+ if (appliedIdx > bootstrapIdx)
+ {
+ d.forEach(e.range, bootstrapIdx, appliedIdx, b, s, (b0,
s0, txnIdx) -> {
+ b0.removeWaitingOnDirectKeyTxnId(txnIdx);
+ });
+ }
+
+ if (bootstrapIdx > 0)
+ {
+ d.forEach(e.range, 0, bootstrapIdx, b, s, e.range, (b0,
s0, r, txnIdx) -> {
+ if (b0.isWaitingOnDirectKeyTxnIdx(txnIdx) &&
s0.isFullyBootstrapping(b0, r, txnIdx))
+ b0.removeWaitingOnDirectKeyTxnId(txnIdx);
+ });
+ }
+ return s;
+ }, new KeyState(), directKeyDeps, builder, ignore -> false);
+ }
+
+ /**
+ * If we have to handle bootstrapping ranges for range transactions,
these may only partially cover the
+ * transaction, in which case we should not remove the transaction as
a dependency. But if it is fully
+ * covered by bootstrapping ranges then we *must* remove it as a
dependency.
+ */
+ class RangeState
+ {
+ Range range;
+ int bootstrapIdx, appliedIdx;
+ Map<Integer, Ranges> partiallyBootstrapping;
+
+ /**
+ * Are the participating ranges for the txn fully covered by
bootstrapping ranges for this command store
+ */
+ boolean isFullyBootstrapping(int rangeTxnIdx)
+ {
+ // if all deps for the txnIdx are contained in the range,
don't inflate any shared object state
+ if (builder.directRangeDeps.foldEachRange(rangeTxnIdx, range,
true, (r1, r2, p) -> p && r1.contains(r2)))
+ return true;
+
+ if (partiallyBootstrapping == null)
+ partiallyBootstrapping = new HashMap<>();
+ Ranges prev = partiallyBootstrapping.get(rangeTxnIdx);
+ Ranges remaining = prev;
+ if (remaining == null) remaining =
builder.directRangeDeps.ranges(rangeTxnIdx);
+ else Invariants.checkState(!remaining.isEmpty());
+ remaining = remaining.without(Ranges.of(range));
+ if (prev == null) Invariants.checkState(!remaining.isEmpty());
+ partiallyBootstrapping.put(rangeTxnIdx, remaining);
+ return remaining.isEmpty();
+ }
+ }
+
+ RangeDeps rangeDeps = builder.directRangeDeps;
+ // TODO (required, consider): slice to only those ranges we own, maybe
don't even construct rangeDeps.covering()
+ foldl(participants, (e, s, d, b) -> {
+ int bootstrapIdx = d.txnIds().find(e.bootstrappedAt);
+ if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx;
+ s.bootstrapIdx = bootstrapIdx;
+
+ int appliedIdx =
d.txnIds().find(e.locallyAppliedOrInvalidatedBefore);
+ if (appliedIdx < 0) appliedIdx = -1 - appliedIdx;
+ s.appliedIdx = appliedIdx;
+
+ // remove intersecting transactions with known redundant txnId
+ if (appliedIdx > bootstrapIdx)
+ {
+ // TODO (desired):
+ // TODO (desired): move the bounds check into forEach,
matching structure used for keys
+ d.forEach(e.range, b, s, (b0, s0, txnIdx) -> {
+ if (txnIdx >= s0.bootstrapIdx && txnIdx < s0.appliedIdx)
+ b0.removeWaitingOnDirectRangeTxnId(txnIdx);
+ });
+ }
+
+ if (bootstrapIdx > 0)
+ {
+ // if we have any ranges where bootstrap is involved, we have
to do a more complicated dance since
+ // this may imply only partial redundancy (we may still depend
on the transaction for some other range)
+ s.range = e.range;
+ // TODO (desired): move the bounds check into forEach,
matching structure used for keys
+ d.forEach(e.range, b, s, (b0, s0, txnIdx) -> {
+ if (txnIdx < s0.bootstrapIdx &&
b0.isWaitingOnDirectRangeTxnIdx(txnIdx) && s0.isFullyBootstrapping(txnIdx))
+ b0.removeWaitingOnDirectRangeTxnId(txnIdx);
+ });
+ }
+ return s;
+ }, new RangeState(), rangeDeps, builder, ignore -> false);
+ }
+
+ public final boolean hasLocallyRedundantDependencies(TxnId
minimumDependencyId, Timestamp executeAt, Participants<?>
participantsOfWaitingTxn)
+ {
+ // TODO (required): consider race conditions when bootstrapping into
an active command store, that may have seen a higher txnId than this?
+ // might benefit from maintaining a per-CommandStore largest TxnId
register to ensure we allocate a higher TxnId for our ExclSync,
+ // or from using whatever summary records we have for the range,
once we maintain them
+ return status(minimumDependencyId,
participantsOfWaitingTxn).compareTo(RedundantStatus.PARTIALLY_PRE_BOOTSTRAP_OR_STALE)
>= 0;
+ }
}
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index dd25ad3e..bb2bf7e8 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -153,7 +153,7 @@ public abstract class SafeCommandStore
protected SafeCommandsForKey maybeCleanup(SafeCommandsForKey safeCfk)
{
- RedundantBefore.Entry entry =
commandStore().redundantBefore().get(safeCfk.key().toUnseekable());
+ RedundantBefore.Entry entry =
redundantBefore().get(safeCfk.key().toUnseekable());
if (entry != null)
safeCfk.updateRedundantBefore(this, entry);
return safeCfk;
@@ -356,6 +356,26 @@ public abstract class SafeCommandStore
public abstract NodeTimeService time();
public abstract CommandStores.RangesForEpoch ranges();
+ protected NavigableMap<TxnId, Ranges> bootstrapBeganAt()
+ {
+ return commandStore().unsafeGetBootstrapBeganAt();
+ }
+
+ protected NavigableMap<Timestamp, Ranges> safeToReadAt()
+ {
+ return commandStore().unsafeGetSafeToRead();
+ }
+
+ public RedundantBefore redundantBefore()
+ {
+ return commandStore().unsafeGetRedundantBefore();
+ }
+
+ public DurableBefore durableBefore()
+ {
+ return commandStore().unsafeGetDurableBefore();
+ }
+
public Ranges futureRanges(TxnId txnId)
{
return ranges().allBefore(txnId.epoch());
@@ -376,11 +396,21 @@ public abstract class SafeCommandStore
return ranges().allBetween(txnId.epoch(), untilLocalEpoch);
}
+ public final Ranges safeToReadAt(Timestamp at)
+ {
+ return safeToReadAt().lowerEntry(at).getValue();
+ }
+
+ public @Nonnull Ranges unsafeToReadAt(Timestamp at)
+ {
+ return ranges().allAt(at).without(safeToReadAt(at));
+ }
+
// if we have to re-bootstrap (due to failed bootstrap or catching up on a
range) then we may
// have dangling redundant commands; these can safely be executed locally
because we are a timestamp store
final boolean isFullyPreBootstrapOrStale(Command command, Participants<?>
forKeys)
{
- return
commandStore().redundantBefore().preBootstrapOrStale(command.txnId(), forKeys)
== FULLY;
+ return redundantBefore().preBootstrapOrStale(command.txnId(), forKeys)
== FULLY;
}
public void registerListener(SafeCommand listeningTo, SaveStatus await,
TxnId waiting)
diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java
b/accord-core/src/main/java/accord/local/StoreParticipants.java
index 03753722..0ba6f6a3 100644
--- a/accord-core/src/main/java/accord/local/StoreParticipants.java
+++ b/accord-core/src/main/java/accord/local/StoreParticipants.java
@@ -173,7 +173,7 @@ public class StoreParticipants
? safeStore.ranges().all()
: safeStore.ranges().allAt(executeAt.epoch());
- return
safeStore.commandStore().redundantBefore().removePreBootstrap(txnId, ranges);
+ return safeStore.redundantBefore().removePreBootstrap(txnId, ranges);
}
public Participants<?> executes(SafeCommandStore safeStore, TxnId txnId,
Timestamp executeAt)
diff --git a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
index 8207dd6e..18e24219 100644
--- a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
@@ -113,6 +113,6 @@ public abstract class SafeCommandsForKey implements
SafeState<CommandsForKey>
public void refresh(SafeCommandStore safeStore)
{
- updateRedundantBefore(safeStore,
safeStore.commandStore().redundantBefore().get(key));
+ updateRedundantBefore(safeStore, safeStore.redundantBefore().get(key));
}
}
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java
b/accord-core/src/main/java/accord/messages/PreAccept.java
index 1b901e8f..bcb01261 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -263,7 +263,7 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
}, executeAt.equals(txnId) ? null :
txnId, builder);
// TODO (required): make sure any sync point is in the past
- Deps redundant =
safeStore.commandStore().redundantBefore().collectDeps(participants.touches(),
redundantBuilder, minEpoch, executeAt).build();
+ Deps redundant =
safeStore.redundantBefore().collectDeps(participants.touches(),
redundantBuilder, minEpoch, executeAt).build();
Deps result = builder.build().with(redundant);
Invariants.checkState(!result.contains(txnId));
return result;
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
index f819dadc..a50f7deb 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -354,7 +354,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
if (participants.owns().isEmpty())
return known.knownForAny();
- RedundantStatus status =
safeStore.commandStore().redundantBefore().status(txnId, participants.owns());
+ RedundantStatus status = safeStore.redundantBefore().status(txnId,
participants.owns());
// if our peers have truncated this command, then either:
// 1) we have already applied it locally; 2) the command doesn't apply
locally; 3) we are stale; or 4) the command is invalidated
@@ -367,7 +367,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
}
Ranges ranges = safeStore.ranges().allSince(txnId.epoch());
- ranges =
safeStore.commandStore().redundantBefore().everExpectToExecute(txnId, ranges);
+ ranges = safeStore.redundantBefore().everExpectToExecute(txnId,
ranges);
if (!ranges.isEmpty())
{
// even though command stores only lose ranges, we still adopt
ranges as of some epoch, and re-bootstrap.
@@ -384,14 +384,14 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
return null;
Participants<?> executes = participants.executes(safeStore, txnId,
executeAtIfKnown);
- status = safeStore.commandStore().redundantBefore().status(txnId,
executes);
+ status = safeStore.redundantBefore().status(txnId, executes);
if (tryPurge(safeStore, safeCommand, status))
return null;
// compute the ranges we expect to execute - i.e. those we own, and
are not stale or pre-bootstrap
// TODO (required): use StoreParticipants.executes
Ranges ranges = safeStore.ranges().allAt(executeAtIfKnown.epoch());
- ranges =
safeStore.commandStore().redundantBefore().expectToExecute(txnId,
executeAtIfKnown, ranges);
+ ranges = safeStore.redundantBefore().expectToExecute(txnId,
executeAtIfKnown, ranges);
if (ranges.isEmpty() || (executes = executes.slice(ranges,
Minimal)).isEmpty())
{
// TODO (expected): we might prefer to adopt Redundant status, and
permit ourselves to later accept the result of the execution and/or definition
diff --git a/accord-core/src/main/java/accord/messages/QueryDurableBefore.java
b/accord-core/src/main/java/accord/messages/QueryDurableBefore.java
index 057e9a38..5c9d694c 100644
--- a/accord-core/src/main/java/accord/messages/QueryDurableBefore.java
+++ b/accord-core/src/main/java/accord/messages/QueryDurableBefore.java
@@ -42,7 +42,7 @@ public class QueryDurableBefore extends
AbstractEpochRequest<QueryDurableBefore.
@Override
public DurableBeforeReply apply(SafeCommandStore safeStore)
{
- return new
DurableBeforeReply(safeStore.commandStore().durableBefore());
+ return new DurableBeforeReply(safeStore.durableBefore());
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java
b/accord-core/src/main/java/accord/messages/ReadData.java
index 7dfa12d0..5d75f0af 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -285,7 +285,7 @@ public abstract class ReadData extends
AbstractEpochRequest<ReadData.CommitOrRea
{
Timestamp executeAt = command.executesAtLeast();
// TODO (required): for awaitsOnlyDeps commands, if we cannot infer an
actual executeAtLeast we should confirm no situation where txnId is not an
adequately conservative value for unavailable/unsafeToRead
- return safeStore.ranges().unsafeToReadAt(executeAt);
+ return safeStore.unsafeToReadAt(executeAt);
}
void read(SafeCommandStore safeStore, Command command)
diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
index 3c396ec7..56093563 100644
--- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
@@ -48,7 +48,7 @@ public class SetGloballyDurable extends
AbstractEpochRequest<SimpleReply>
@Override
public SimpleReply apply(SafeCommandStore safeStore)
{
- DurableBefore cur = safeStore.commandStore().durableBefore();
+ DurableBefore cur = safeStore.durableBefore();
DurableBefore upd = DurableBefore.merge(durableBefore, cur);
// This is done asynchronously
safeStore.upsertDurableBefore(upd);
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java
b/accord-core/src/test/java/accord/impl/basic/Journal.java
index 1a53472b..6d0b04da 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -89,8 +89,7 @@ public class Journal
Command command = reconstruct(diffs, Reconstruct.Last).get(0);
if (command.status() == Truncated || command.status() ==
Invalidated)
continue; // Already truncated
- StoreParticipants participants =
Invariants.nonNull(command.participants());
- Cleanup cleanup = Cleanup.shouldCleanup(store, command,
participants);
+ Cleanup cleanup = Cleanup.shouldCleanup(command,
store.unsafeGetRedundantBefore(), store.unsafeGetDurableBefore());
switch (cleanup)
{
case NO:
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index eb765767..915442ad 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -70,7 +70,7 @@ public class ListRead implements Read
ListStore s = (ListStore)store;
logger.trace("submitting READ on {} at {} key:{}", s.node, executeAt,
key);
return executor.apply(safeStore.commandStore()).submit(() -> {
- Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt);
+ Ranges unavailable = safeStore.unsafeToReadAt(executeAt);
ListData result = new ListData();
switch (key.domain())
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]