This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0a5446a3 Fix: - StoreParticipants.touches behaviour for RX was
erroneously modified; should touch all non-redundant ranges including those no
longer owned - Support improved partial compaction - SetShardDurable should
correctly set DurableBefore Majority/Universal based on the Durability
parameter - fix erroneous prunedBefore invariant - unset superseding rather
than set null, so can omit empty - Don't save updates to ERASED - Simplify
CommandChange.getFlags - fix handlin [...]
0a5446a3 is described below
commit 0a5446a365fe7eb1a15b6a2d0d72f9475c51bc47
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Mar 13 12:39:09 2025 +0000
Fix:
- StoreParticipants.touches behaviour for RX was erroneously modified;
should touch all non-redundant ranges including those no longer owned
- Support improved partial compaction
- SetShardDurable should correctly set DurableBefore Majority/Universal
based on the Durability parameter
- fix erroneous prunedBefore invariant
- unset superseding rather than set null, so can omit empty
- Don't save updates to ERASED
- Simplify CommandChange.getFlags
- fix handling of Durability for Invalidated
- Don't use ApplyAt for GC_BEFORE with partial input, as might be a
saveStatus >= ApplyAtKnown but with executeAt < ApplyAtKnown
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20441
---
accord-core/src/main/java/accord/api/Journal.java | 3 +
.../src/main/java/accord/impl/AbstractLoader.java | 18 +-
.../src/main/java/accord/impl/CommandChange.java | 354 +++++++++++++++------
.../java/accord/impl/InMemoryCommandStore.java | 2 +-
.../impl/progresslog/DefaultProgressLog.java | 3 +-
.../src/main/java/accord/local/Cleanup.java | 114 ++++---
.../src/main/java/accord/local/CommandStore.java | 4 +-
.../src/main/java/accord/local/Commands.java | 7 +-
.../main/java/accord/local/RedundantBefore.java | 29 +-
.../main/java/accord/local/StoreParticipants.java | 5 +-
.../main/java/accord/local/cfk/CommandsForKey.java | 2 +-
.../main/java/accord/messages/SetShardDurable.java | 5 +-
.../src/main/java/accord/primitives/Known.java | 6 +
.../src/main/java/accord/utils/Invariants.java | 12 +
accord-core/src/test/java/accord/Utils.java | 2 +-
.../src/test/java/accord/burn/BurnTestBase.java | 4 +-
.../src/test/java/accord/impl/basic/Cluster.java | 10 +-
.../java/accord/impl/basic/InMemoryJournal.java | 190 +++++++----
.../java/accord/impl/basic/LoggingJournal.java | 8 +
.../test/java/accord/impl/mock/MockCluster.java | 3 +-
.../java/accord/local/ImmutableCommandTest.java | 2 +-
.../src/main/java/accord/maelstrom/Cluster.java | 1 +
22 files changed, 538 insertions(+), 246 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Journal.java
b/accord-core/src/main/java/accord/api/Journal.java
index 00e3ffb4..a1246f76 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
import accord.local.Command;
import accord.local.CommandStores;
import accord.local.DurableBefore;
+import accord.local.Node;
import accord.local.RedundantBefore;
import accord.primitives.EpochSupplier;
import accord.primitives.Ranges;
@@ -42,6 +43,8 @@ import org.agrona.collections.Int2ObjectHashMap;
*/
public interface Journal
{
+ Journal start(Node node);
+
Command loadCommand(int store, TxnId txnId, RedundantBefore
redundantBefore, DurableBefore durableBefore);
Command.Minimal loadMinimal(int store, TxnId txnId, Load load,
RedundantBefore redundantBefore, DurableBefore durableBefore);
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
index c55ac6c8..9020b96a 100644
--- a/accord-core/src/main/java/accord/impl/AbstractLoader.java
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -31,7 +31,6 @@ import accord.primitives.TxnId;
import static accord.local.Cleanup.Input.FULL;
import static accord.primitives.SaveStatus.PreApplied;
-import static accord.primitives.Status.Invalidated;
import static accord.primitives.Status.Stable;
import static accord.primitives.Status.Truncated;
import static accord.primitives.Txn.Kind.Write;
@@ -41,20 +40,9 @@ public abstract class AbstractLoader implements
Journal.Loader
protected Command loadInternal(Command command, SafeCommandStore safeStore)
{
TxnId txnId = command.txnId();
- if (command.status() != Truncated && command.status() != Invalidated)
- {
- Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command,
command.participants());
- switch (cleanup)
- {
- case NO:
- break;
- case INVALIDATE:
- case TRUNCATE_WITH_OUTCOME:
- case TRUNCATE:
- case ERASE:
- command = Commands.purge(safeStore, command, cleanup);
- }
- }
+ Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command,
command.participants());
+ if (cleanup != Cleanup.NO)
+ command = Commands.purge(safeStore, command, cleanup);
return safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
}
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java
b/accord-core/src/main/java/accord/impl/CommandChange.java
index 51c56332..c45dd411 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -18,14 +18,15 @@
package accord.impl;
+import java.util.Objects;
import java.util.function.BiPredicate;
-import java.util.function.Function;
import java.util.function.Predicate;
-import java.util.function.ToLongFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
-import accord.api.Agent;
import accord.api.Result;
import accord.local.Cleanup;
import accord.local.Cleanup.Input;
@@ -90,18 +91,18 @@ public class CommandChange
{
PARTICIPANTS, // stored first so we can index it
SAVE_STATUS,
- PARTIAL_DEPS,
- EXECUTE_AT,
- EXECUTES_AT_LEAST,
- MIN_UNIQUE_HLC,
DURABILITY,
- ACCEPTED,
+ EXECUTE_AT,
PROMISED,
- WAITING_ON,
+ ACCEPTED,
PARTIAL_TXN,
+ PARTIAL_DEPS,
+ WAITING_ON,
+ MIN_UNIQUE_HLC,
+ EXECUTES_AT_LEAST,
WRITES,
- CLEANUP,
RESULT,
+ CLEANUP,
;
public static final Field[] FIELDS = values();
@@ -160,25 +161,27 @@ public class CommandChange
protected TxnId txnId;
- protected Timestamp executeAt;
- protected Timestamp executesAtLeast;
- protected long minUniqueHlc;
+ protected StoreParticipants participants;
protected SaveStatus saveStatus;
protected Durability durability;
+ protected Timestamp executeAt;
- protected Ballot acceptedOrCommitted;
protected Ballot promised;
+ protected Ballot acceptedOrCommitted;
- protected StoreParticipants participants;
protected PartialTxn partialTxn;
protected PartialDeps partialDeps;
protected CommandChange.WaitingOnProvider waitingOn;
+ protected long minUniqueHlc;
+ protected Timestamp executesAtLeast;
+
protected Writes writes;
protected Result result;
+
protected Cleanup cleanup;
- protected boolean nextCalled;
+ protected boolean hasUpdate;
protected int count;
public Builder(TxnId txnId, Load load)
@@ -226,17 +229,17 @@ public class CommandChange
{
switch (field)
{
- case EXECUTE_AT: return executeAt;
- case EXECUTES_AT_LEAST: return executesAtLeast;
- case MIN_UNIQUE_HLC: return minUniqueHlc;
+ case PARTICIPANTS: return participants;
case SAVE_STATUS: return saveStatus;
case DURABILITY: return durability;
- case ACCEPTED: return acceptedOrCommitted;
+ case EXECUTE_AT: return executeAt;
case PROMISED: return promised;
- case PARTICIPANTS: return participants;
+ case ACCEPTED: return acceptedOrCommitted;
case PARTIAL_TXN: return partialTxn;
case PARTIAL_DEPS: return partialDeps;
case WAITING_ON: return waitingOn;
+ case MIN_UNIQUE_HLC: return minUniqueHlc;
+ case EXECUTES_AT_LEAST: return executesAtLeast;
case WRITES: return writes;
case RESULT: return result;
default: throw new UnhandledEnum(field);
@@ -248,25 +251,26 @@ public class CommandChange
flags = 0;
txnId = null;
- executeAt = null;
- executesAtLeast = null;
- minUniqueHlc = 0;
+ participants = null;
saveStatus = null;
durability = null;
+ executeAt = null;
- acceptedOrCommitted = null;
promised = null;
+ acceptedOrCommitted = null;
- participants = null;
partialTxn = null;
partialDeps = null;
waitingOn = null;
+ minUniqueHlc = 0;
+ executesAtLeast = null;
+
writes = null;
result = null;
cleanup = null;
- nextCalled = false;
+ hasUpdate = false;
count = 0;
}
@@ -287,7 +291,12 @@ public class CommandChange
public boolean isEmpty()
{
- return !nextCalled;
+ return !hasUpdate;
+ }
+
+ public int flags()
+ {
+ return flags;
}
public int count()
@@ -295,31 +304,38 @@ public class CommandChange
return count;
}
- public Cleanup shouldCleanup(Input input, Agent agent, RedundantBefore
redundantBefore, DurableBefore durableBefore)
+ public Cleanup shouldCleanup(Input input, RedundantBefore
redundantBefore, DurableBefore durableBefore)
{
- if (!nextCalled)
+ if (!hasUpdate)
return NO;
Durability durability = this.durability;
if (durability == null) durability = NotDurable;
- Cleanup cleanup = Cleanup.shouldCleanup(input, agent, txnId,
executeAt, saveStatus, durability, participants, redundantBefore,
durableBefore);
+ StoreParticipants participants = this.participants;
+ // TODO (expected): we need to filter participants to correctly
compute doesStillExecute in Cleanup.shouldCleanup;
+ // would be better to break this dependency, or otherwise encode
it better.
+ // In particular it would be nice to avoid doing this twice for
each command on load, as we also do this in SafeCommandStore.
+ // Perhaps we can special-case loading, and simply update the
participants here so we can avoid doing it again on access
+ if (input == Input.FULL && participants != null)
+ participants = participants.filter(LOAD, redundantBefore,
txnId, saveStatus != null && saveStatus.known.isExecuteAtKnown() ? executeAt :
null);
+ Cleanup cleanup = Cleanup.shouldCleanup(input, txnId, executeAt,
saveStatus, durability, participants, redundantBefore, durableBefore);
if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
cleanup = this.cleanup;
return cleanup;
}
- public Cleanup maybeCleanup(Input input, Agent agent, RedundantBefore
redundantBefore, DurableBefore durableBefore)
+ public Cleanup maybeCleanup(boolean clearFields, Input input,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
- Cleanup cleanup = shouldCleanup(input, agent, redundantBefore,
durableBefore);
- return maybeCleanup(input, cleanup);
+ Cleanup cleanup = shouldCleanup(input, redundantBefore,
durableBefore);
+ return maybeCleanup(clearFields, input, cleanup);
}
- public Cleanup maybeCleanup(Input input, Cleanup cleanup)
+ public Cleanup maybeCleanup(boolean clearFields, Input input, Cleanup
cleanup)
{
if (cleanup == NO || cleanup == EXPUNGE)
return cleanup;
- SaveStatus newSaveStatus = cleanup.appliesIfNot;
+ SaveStatus newSaveStatus = cleanup.newStatus;
if (saveStatus == null || saveStatus.compareTo(newSaveStatus) < 0)
{
if (input == Input.FULL)
@@ -329,23 +345,38 @@ public class CommandChange
newSaveStatus = SaveStatus.TruncatedUnapplied;
saveStatus = newSaveStatus;
}
- forceSetNulls(eraseKnownFieldsMask[newSaveStatus.ordinal()]);
+ forceSetNulls(clearFields,
eraseKnownFieldsMask[newSaveStatus.ordinal()]);
}
+ this.cleanup = cleanup;
return cleanup;
}
- protected void setNulls(int mask)
+ protected void setNulls(boolean clearFields, int newFlags)
{
- mask &= ~(flags >>> 16); // limit ourselves to those fields that
have not already been set (high 16 bits are those already-set fields)
- forceSetNulls(mask);
+ newFlags &= ~(flags >>> 16); // limit ourselves to those fields
that have not already been set (high 16 bits are those already-set fields)
+ forceSetNulls(clearFields, newFlags);
}
- protected void forceSetNulls(int mask)
+ protected boolean forceSetNulls(boolean clearFields, int newFlags)
{
- mask &= ~nullMask(flags); // limit ourselves to those fields that
are not already null
- mask = nullMask(mask); // limit ourselves to those fields that are
now being set to null
- int iterable = toIterableSetFields(mask);
+ newFlags &= ~nulls(flags); // limit ourselves to those fields that
are not already null
+ newFlags = nulls(newFlags); // limit ourselves to those fields
that are now being set to null
+ if (newFlags == 0)
+ return false;
+
+ if (clearFields)
+ clearFields(newFlags);
+
+ flags |= newFlags;
+ return true;
+ }
+
+ // clears any field with a CHANGED flag NOT limited only to NULL
+ private void clearFields(int newFlags)
+ {
+ newFlags &= notNulls(flags); // limit ourselves to those fields
that are not already null
+ int iterable = toIterableSetFields(newFlags);
for (Field next = nextSetField(iterable); next != null; iterable =
unsetIterable(next, iterable), next = nextSetField(iterable))
{
switch (next)
@@ -367,12 +398,112 @@ public class CommandChange
case RESULT: result = null;
break;
}
}
- flags |= mask;
}
- static int nullMask(int mask)
+ // only populate regular fields that are not already set, but apply
any Cleanup if it is stronger than any already present
+ public boolean fillInMissingOrCleanup(boolean clearFields, Builder add)
+ {
+ hasUpdate = true;
+ count++;
+
+ int addFlags = notAlreadySet(not(CLEANUP, add.flags), flags);
+ if (addFlags == 0)
+ return addCleanup(clearFields, add.cleanup);
+
+ setNulls(false, addFlags);
+ int iterable = toIterableSetFields(notNulls(addFlags));
+ for (Field next = nextSetField(iterable) ; next != null; next =
nextSetField(iterable = unsetIterable(next, iterable)))
+ {
+ switch (next)
+ {
+ default: throw new UnhandledEnum(next);
+ case PARTICIPANTS: participants = add.participants;
break;
+ case SAVE_STATUS: saveStatus = add.saveStatus;
break;
+ case DURABILITY: durability = add.durability;
break;
+ case EXECUTE_AT: executeAt = add.executeAt;
break;
+ case PROMISED: promised = add.promised;
break;
+ case ACCEPTED: acceptedOrCommitted =
add.acceptedOrCommitted; break;
+ case PARTIAL_TXN: partialTxn = add.partialTxn;
break;
+ case PARTIAL_DEPS: partialDeps = add.partialDeps;
break;
+ case WAITING_ON: waitingOn = add.waitingOn;
break;
+ case MIN_UNIQUE_HLC: minUniqueHlc = add.minUniqueHlc;
break;
+ case EXECUTES_AT_LEAST: executesAtLeast =
add.executesAtLeast; break;
+ case WRITES: writes = add.writes;
break;
+ case RESULT: result = add.result;
break;
+ }
+ }
+ flags |= addFlags;
+ addCleanup(clearFields, add.cleanup);
+ return true;
+ }
+
+ // returns true if we made a material update to the Builder;
+ // that is, if we cleared a non-null field or if we are already
mask-only
+ public boolean clearSuperseded(boolean clearFields, Builder
superseding)
+ {
+ int unset = flags & setFieldsMask(superseding.flags);
+ if (notNulls(unset) == 0 && notNulls(flags) != 0)
+ return false;
+
+ if (clearFields)
+ clearFields(unset);
+ flags ^= unset;
+ return true;
+ }
+
+ public boolean addCleanup(boolean clearFields, Cleanup addCleanup)
+ {
+ if (addCleanup == null || addCleanup == NO)
+ return false;
+
+ if (cleanup != null && addCleanup.compareTo(cleanup) <= 0)
+ return false;
+
+ cleanup = addCleanup;
+ if (!cleanup.appliesTo(saveStatus))
+ return false;
+ return forceSetNulls(clearFields,
eraseKnownFieldsMask[cleanup.newStatus.ordinal()]);
+ }
+
+ public boolean cleanup(boolean clearFields, Cleanup apply)
{
- return (mask & 0xffff) | (mask << 16);
+ int unsetFields = eraseKnownFieldsMask[apply.newStatus.ordinal()];
+ unsetFields &= flags;
+ if (unsetFields == 0)
+ return false;
+
+ if (clearFields)
+ clearFields(unsetFields);
+ flags ^= unsetFields;
+ return true;
+ }
+
+ protected static int nulls(int flags)
+ {
+ return (flags & 0xffff) | (flags << 16);
+ }
+
+ protected static int notNulls(int flags)
+ {
+ return flags & (~flags << 16);
+ }
+
+ protected static int not(Field field, int flags)
+ {
+ return flags & ~(0x10001 << field.ordinal());
+ }
+
+ protected static int notAlreadySet(int newFlags, int oldFlags)
+ {
+ return newFlags & ~setFieldsMask(oldFlags);
+ }
+
+ // result has both null and changed flag bits set for any changed
field;
+ // can be used to limit another flags to those that were set bu these
flags, or if inverted to those not set by these flags
+ protected static int setFieldsMask(int flags)
+ {
+ int mask = flags & 0xffff0000;
+ return mask | (mask >>> 16);
}
public Command.Minimal asMinimal()
@@ -388,7 +519,7 @@ public class CommandChange
// TODO (expected): we shouldn't need to filter participants here, we
will do it anyway before using in SafeCommandStore
public Command construct(RedundantBefore redundantBefore)
{
- if (!nextCalled)
+ if (!hasUpdate)
return null;
Invariants.require(txnId != null);
@@ -453,17 +584,17 @@ public class CommandChange
{
return "Builder {" +
"txnId=" + txnId
- + (isChanged(EXECUTE_AT, flags) ? ", executeAt=" +
executeAt : "")
- + (isChanged(EXECUTES_AT_LEAST, flags) ? ",
executesAtLeast=" + executesAtLeast : "")
- + (isChanged(MIN_UNIQUE_HLC, flags) ? ", minUniqueHlc="
+ minUniqueHlc : "")
+ + (isChanged(PARTICIPANTS, flags) ? ", participants="
+ participants : "")
+ (isChanged(SAVE_STATUS, flags) ? ", saveStatus=" +
saveStatus : "")
+ (isChanged(DURABILITY, flags) ? ", durability=" +
durability : "")
- + (isChanged(ACCEPTED, flags) ? ",
acceptedOrCommitted=" + acceptedOrCommitted : "")
+ + (isChanged(EXECUTE_AT, flags) ? ", executeAt=" +
executeAt : "")
+ (isChanged(PROMISED, flags) ? ", promised=" +
promised : "")
- + (isChanged(PARTICIPANTS, flags) ? ", participants="
+ participants : "")
- + (isChanged(PARTIAL_DEPS, flags) ? ", partialTxn=" +
partialTxn : "")
+ + (isChanged(ACCEPTED, flags) ? ",
acceptedOrCommitted=" + acceptedOrCommitted : "")
+ + (isChanged(PARTIAL_TXN, flags) ? ", partialTxn=" +
partialTxn : "")
+ (isChanged(PARTIAL_DEPS, flags) ? ", partialDeps=" +
partialDeps : "")
+ (isChanged(WAITING_ON, flags) ? ", waitingOn=" +
waitingOn : "")
+ + (isChanged(MIN_UNIQUE_HLC, flags) ? ", minUniqueHlc="
+ minUniqueHlc : "")
+ + (isChanged(EXECUTES_AT_LEAST, flags) ? ",
executesAtLeast=" + executesAtLeast : "")
+ (isChanged(WRITES, flags) ? ", writes=" +
writes : "")
+ (isChanged(RESULT, flags) ? ", result=" +
result : "")
+ (isChanged(CLEANUP, flags) ? ", cleanup=" +
cleanup : "") +
@@ -514,73 +645,86 @@ public class CommandChange
*/
@VisibleForTesting
- public static int getFlags(Command before, Command after)
+ public static int getFlags(@Nullable Command before, @Nonnull Command
after)
{
int flags = 0;
- if (before == null && after == null)
- return flags;
-
- // TODO (expected): derive this from precomputed bit masking on Known,
only testing equality of objects we can't infer directly
- flags = collectFlags(before, after, Command::executeAt,
Timestamp::equalsStrict, true, EXECUTE_AT, flags);
- flags = collectFlags(before, after, Command::executesAtLeast, true,
EXECUTES_AT_LEAST, flags);
- flags = collectFlags(before, after, CommandChange::getMinUniqueHlc,
MIN_UNIQUE_HLC, flags);
-
- flags = collectFlags(before, after, Command::saveStatus, false,
SAVE_STATUS, flags);
- flags = collectFlags(before, after, Command::durability, false,
DURABILITY, flags);
-
- flags = collectFlags(before, after, Command::acceptedOrCommitted,
false, ACCEPTED, flags);
- flags = collectFlags(before, after, Command::promised, false,
PROMISED, flags);
-
- flags = collectFlags(before, after, Command::participants, true,
PARTICIPANTS, flags);
- flags = collectFlags(before, after, Command::partialTxn, false,
PARTIAL_TXN, flags);
- flags = collectFlags(before, after, Command::partialDeps, false,
PARTIAL_DEPS, flags);
- flags = collectFlags(before, after, Command::waitingOn,
WaitingOn::equalBitSets, true, WAITING_ON, flags);
-
- flags = collectFlags(before, after, Command::writes, false, WRITES,
flags);
- flags = collectFlags(before, after, Command::result, false, RESULT,
flags);
+ SaveStatus saveStatus = after.saveStatus();
+ if (before == null)
+ {
+ flags |= addIdentityFlags(null, after.participants(),
PARTICIPANTS);
+ flags |= addIdentityFlags(null, saveStatus, SAVE_STATUS);
+ flags |= addIdentityFlags(null, after.durability(), DURABILITY);
+ flags |= addIdentityFlags(null, after.executeAt(), EXECUTE_AT);
+ flags |= addIdentityFlags(null, after.promised(), PROMISED);
+ flags |= addIdentityFlags(null, after.acceptedOrCommitted(),
ACCEPTED);
+ flags |= addIdentityFlags(null, after.partialTxn(), PARTIAL_TXN);
+ flags |= addIdentityFlags(null, after.partialDeps(), PARTIAL_DEPS);
+ if (after.waitingOn() != null)
+ {
+ flags |= setChanged(WAITING_ON, 0);
+ flags |= addIdentityFlags(0, getMinUniqueHlc(after),
MIN_UNIQUE_HLC);
+ }
+ flags |= addIdentityFlags(null, after.executesAtLeast(),
EXECUTES_AT_LEAST);
+ flags |= addIdentityFlags(null, after.writes(), WRITES);
+ flags |= addIdentityFlags(null, after.result(), RESULT);
+ }
+ else
+ {
+ flags |= addEqualityFlags(before.participants(),
after.participants(), PARTICIPANTS);
+ flags |= addIdentityFlags(before.saveStatus(), saveStatus,
SAVE_STATUS);
+ flags |= addIdentityFlags(before.durability(), after.durability(),
DURABILITY);
+ flags |= addFlags(before.executeAt(), after.executeAt(),
Timestamp::equalsStrict, EXECUTE_AT);
+ flags |= addEqualityFlags(before.promised(), after.promised(),
PROMISED);
+ flags |= addEqualityFlags(before.acceptedOrCommitted(),
after.acceptedOrCommitted(), ACCEPTED);
+ flags |= addIdentityFlags(before.partialTxn(), after.partialTxn(),
PARTIAL_TXN);
+ flags |= addIdentityFlags(before.partialDeps(),
after.partialDeps(), PARTIAL_DEPS);
+ if (before.waitingOn() != after.waitingOn())
+ {
+ flags |= setChanged(WAITING_ON);
+ flags |= addIdentityFlags(getMinUniqueHlc(before),
getMinUniqueHlc(after), MIN_UNIQUE_HLC);
+ }
+ flags |= addEqualityFlags(before.executesAtLeast(),
after.executesAtLeast(), EXECUTES_AT_LEAST);
+ flags |= addIdentityFlags(before.writes(), after.writes(), WRITES);
+ flags |= addIdentityFlags(before.result(), after.result(), RESULT);
+ }
// make sure we have enough information to decide whether to expunge
timestamps (for unique ApplyAt HLC guarantees)
- if (after.saveStatus().known.is(ApplyAtKnown) && (before == null ||
!before.saveStatus().known.is(ApplyAtKnown)))
+ if (saveStatus.known.is(ApplyAtKnown) && (before == null ||
!before.saveStatus().known.is(ApplyAtKnown)))
{
flags = setChanged(EXECUTE_AT, flags);
flags = setChanged(PARTICIPANTS, flags);
flags = setChanged(SAVE_STATUS, flags);
}
- flags |= eraseKnownFieldsMask[after.saveStatus().ordinal()];
-
+ flags |= eraseKnownFieldsMask[saveStatus.ordinal()];
return flags;
}
- private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ,
VAL> convert, boolean allowClassMismatch, Field field, int flags)
+ private static int addIdentityFlags(Object l, Object r, Field field)
{
- return collectFlags(lo, ro, convert, Object::equals,
allowClassMismatch, field, flags);
+ if (l == r) return 0;
+ if (r == null) return setIsNullAndChanged(field);
+ return setChanged(field);
}
- private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ,
VAL> convert, BiPredicate<VAL, VAL> equals, boolean allowClassMismatch, Field
field, int flags)
+ private static <T> int addEqualityFlags(T l, T r, Field field)
{
- VAL l = null;
- VAL r = null;
- if (lo != null) l = convert.apply(lo);
- if (ro != null) r = convert.apply(ro);
-
- if (l == r) return flags; // no change
- if (r == null) return setIsNullAndChanged(field, flags);
- if (l == null) return setChanged(field, flags);
- Invariants.require(allowClassMismatch || l.getClass() == r.getClass(),
"%s != %s", l.getClass(), r.getClass());
- if (equals.test(l, r)) return flags; // no change
- return setChanged(field, flags);
+ return addFlags(l, r, Objects::equals, field);
}
- private static <OBJ> int collectFlags(OBJ lo, OBJ ro, ToLongFunction<OBJ>
convert, Field field, int flags)
+ private static <T> int addFlags(T l, T r, BiPredicate<T, T> equality,
Field field)
{
- long l = 0, r = 0;
- if (lo != null) l = convert.applyAsLong(lo);
- if (ro != null) r = convert.applyAsLong(ro);
+ if (l == r) return 0;
+ if (r == null) return setIsNullAndChanged(field);
+ if (l == null) return setChanged(field);
+ if (equality.test(l, r)) return 0;
+ return setChanged(field);
+ }
- return l == r ? flags:
- r == 0 ? setIsNullAndChanged(field, flags)
- : setChanged(field, flags);
+ private static int addIdentityFlags(long l, long r, Field field)
+ {
+ if (l == r) return 0;
+ return setChanged(field);
}
public static boolean anyFieldChanged(int flags)
@@ -599,6 +743,16 @@ public class CommandChange
return oldFlags | (0x10000 << field.ordinal());
}
+ public static int setChanged(Field field)
+ {
+ return 0x10000 << field.ordinal();
+ }
+
+ public static int setIsNullAndChanged(Field field)
+ {
+ return 0x10001 << field.ordinal();
+ }
+
@VisibleForTesting
public static boolean isChanged(Field field, int oldFlags)
{
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 9d03b7de..64103a3d 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -289,7 +289,7 @@ public abstract class InMemoryCommandStore extends
CommandStore
Invariants.require(globalCommand != null &&
!globalCommand.isEmpty());
Command command = globalCommand.value();
StoreParticipants participants =
command.participants().filter(LOAD, safeStore, txnId,
command.executeAtIfKnown());
- Cleanup cleanup = Cleanup.shouldCleanup(FULL, agent, txnId,
command.executeAtIfKnown(), command.saveStatus(), command.durability(),
participants, unsafeGetRedundantBefore(), durableBefore());
+ Cleanup cleanup = Cleanup.shouldCleanup(FULL, txnId,
command.executeAtIfKnown(), command.saveStatus(), command.durability(),
participants, unsafeGetRedundantBefore(), durableBefore());
Invariants.require(command.hasBeen(Applied)
|| cleanup.compareTo(Cleanup.TRUNCATE) >= 0
|| (durableBefore().min(txnId) == NotDurable &&
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index d7da2ce7..d95f4af5 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -306,8 +306,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
@Override
public void clearBefore(SafeCommandStore safeStore, TxnId
clearWaitingBefore, TxnId clearAllBefore)
{
- if (clearAllBefore.compareTo(clearWaitingBefore) >= 0)
- clearWaitingBefore = clearAllBefore;
+ Invariants.require(clearAllBefore.compareTo(clearWaitingBefore) <= 0);
int index = 0;
while (index < BTree.size(stateMap))
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java
b/accord-core/src/main/java/accord/local/Cleanup.java
index 75454675..a1ee2477 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -21,7 +21,6 @@ package accord.local;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import accord.api.Agent;
import accord.primitives.FullRoute;
import accord.primitives.Participants;
import accord.primitives.Route;
@@ -33,6 +32,7 @@ import accord.utils.Invariants;
import accord.utils.UnhandledEnum;
import static accord.api.ProtocolModifiers.Toggles.requiresUniqueHlcs;
+import static accord.local.Cleanup.Input.FULL;
import static accord.local.Cleanup.Input.PARTIAL;
import static accord.local.RedundantStatus.Property.GC_BEFORE;
import static accord.local.RedundantStatus.Property.LOCALLY_APPLIED;
@@ -48,6 +48,7 @@ import static accord.primitives.SaveStatus.TruncatedApply;
import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
import static accord.primitives.SaveStatus.Uninitialised;
import static accord.primitives.SaveStatus.Vestigial;
+import static accord.primitives.Status.Durability.NotDurable;
import static accord.primitives.Status.Durability.UniversalOrInvalidated;
import static accord.primitives.Status.PreCommitted;
import static accord.primitives.Status.Truncated;
@@ -73,21 +74,38 @@ public enum Cleanup
private static final Cleanup[] VALUES = values();
- public final SaveStatus appliesIfNot;
+ public final SaveStatus newStatus;
- Cleanup(SaveStatus appliesIfNot)
+ Cleanup(SaveStatus newStatus)
{
- this.appliesIfNot = appliesIfNot;
+ this.newStatus = newStatus;
+ }
+
+ public final Cleanup atLeast(Cleanup that)
+ {
+ return compareTo(that) >= 0 ? this : that;
+ }
+
+ public final boolean appliesTo(SaveStatus saveStatus)
+ {
+ if (saveStatus == null)
+ return this != NO;
+
+ switch (this)
+ {
+ case EXPUNGE: return true;
+ case ERASE: return saveStatus != Erased;
+ default: return saveStatus.compareTo(newStatus) < 0;
+ }
}
public final Cleanup filter(SaveStatus saveStatus)
{
- return saveStatus != null && saveStatus.compareTo(appliesIfNot) >= 0
&& this != EXPUNGE ? NO : this;
+ return appliesTo(saveStatus) ? this : NO;
}
public enum Input { PARTIAL, FULL }
- // TODO (required): simulate compaction of log records in burn test
public static Cleanup shouldCleanup(Input input, SafeCommandStore
safeStore, Command command)
{
return shouldCleanup(input, safeStore, command,
command.participants());
@@ -95,19 +113,19 @@ public enum Cleanup
public static Cleanup shouldCleanup(Input input, SafeCommandStore
safeStore, Command command, @Nonnull StoreParticipants participants)
{
- return shouldCleanup(input, safeStore.agent(), command.txnId(),
command.executeAt(), command.saveStatus(), command.durability(), participants,
+ return shouldCleanup(input, command.txnId(), command.executeAt(),
command.saveStatus(), command.durability(), participants,
safeStore.redundantBefore(),
safeStore.durableBefore());
}
- public static Cleanup shouldCleanup(Input input, Agent agent, Command
command, RedundantBefore redundantBefore, DurableBefore durableBefore)
+ public static Cleanup shouldCleanup(Input input, Command command,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
- return shouldCleanup(input, agent, command.txnId(),
command.executeAt(), command.saveStatus(), command.durability(),
command.participants(),
+ return shouldCleanup(input, command.txnId(), command.executeAt(),
command.saveStatus(), command.durability(), command.participants(),
redundantBefore, durableBefore);
}
- public static Cleanup shouldCleanup(Input input, Agent agent, TxnId txnId,
Timestamp executeAt, SaveStatus status, Durability durability,
StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore
durableBefore)
+ public static Cleanup shouldCleanup(Input input, TxnId txnId, Timestamp
executeAt, SaveStatus status, Durability durability, StoreParticipants
participants, RedundantBefore redundantBefore, DurableBefore durableBefore)
{
- Cleanup cleanup = shouldCleanupInternal(input, agent, txnId,
executeAt, status, durability, participants, redundantBefore, durableBefore);
+ Cleanup cleanup = shouldCleanupInternal(input, txnId, executeAt,
status, durability, participants, redundantBefore, durableBefore);
return cleanup.filter(status);
}
@@ -137,47 +155,57 @@ public enum Cleanup
* we pessimistically assume the whole cluster may need to see its outcome
* TODO (expected): we should be able to rely on replicas to infer
Invalidated from an Erased record
*/
- private static Cleanup shouldCleanupInternal(Input input, Agent agent,
TxnId txnId, @Nullable Timestamp executeAt, @Nullable SaveStatus saveStatus,
Durability durability, @Nullable StoreParticipants participants,
RedundantBefore redundantBefore, DurableBefore durableBefore)
+ private static Cleanup shouldCleanupInternal(Input input, TxnId txnId,
@Nullable Timestamp executeAt, @Nullable SaveStatus saveStatus, Durability
durability, @Nullable StoreParticipants participants, RedundantBefore
redundantBefore, DurableBefore durableBefore)
{
if (txnId.kind() == EphemeralRead)
return NO;
if (expunge(txnId, executeAt, saveStatus, participants,
redundantBefore, durableBefore))
- return expunge();
+ return expunge(txnId);
if (saveStatus == null || participants == null)
return NO;
if (participants.hasFullRoute())
- return cleanupWithFullRoute(input, agent, participants, txnId,
executeAt, saveStatus, durability, redundantBefore, durableBefore);
+ return cleanupWithFullRoute(input, participants, txnId, executeAt,
saveStatus, durability, redundantBefore, durableBefore);
return cleanupWithoutFullRoute(input, txnId, saveStatus, participants,
redundantBefore, durableBefore);
}
- private static Cleanup cleanupWithFullRoute(Input input, Agent agent,
StoreParticipants participants, TxnId txnId, Timestamp executeAt, SaveStatus
saveStatus, Durability durability, RedundantBefore redundantBefore,
DurableBefore durableBefore)
+ private static Cleanup cleanupWithFullRoute(Input input, StoreParticipants
participants, TxnId txnId, Timestamp executeAt, SaveStatus saveStatus,
Durability durability, RedundantBefore redundantBefore, DurableBefore
durableBefore)
{
// We first check if the command is redundant locally, i.e. whether it
has been applied to all non-faulty replicas of the local shard
// If not, we don't want to truncate its state else we may make
catching up for these other replicas much harder
FullRoute<?> route = Route.castToFullRoute(participants.route());
- RedundantStatus redundant = redundantBefore.status(txnId,
saveStatus.known.is(ApplyAtKnown) ? executeAt : null, route);
- Invariants.require(redundant.none(NOT_OWNED),"Command " + txnId + "
that is being loaded is not owned by this shard on route " + route);
+ // we must not use executeAt here if input == PARTIAL because with
partial compaction we might be merging the combination of some old executeAt
with a later partial status
+ RedundantStatus redundant = redundantBefore.status(txnId, input ==
FULL && saveStatus.known.is(ApplyAtKnown) ? executeAt : null, route);
+ Invariants.require(redundant.none(NOT_OWNED),"Command %s that is being
loaded is not owned by this shard on route %s", txnId, route);
if (redundant.none(LOCALLY_REDUNDANT))
return NO;
- Cleanup ifUndecided = cleanupIfUndecidedWithFullRoute(input, txnId,
saveStatus, redundant);
- if (ifUndecided != null)
- return ifUndecided;
-
+ Cleanup min = cleanupIfUndecidedWithFullRoute(input, txnId,
saveStatus, redundant);
if (!redundant.all(TRUNCATE_BEFORE))
{
// TODO (expected): see if we can improve our invariants so we can
remove this special-case
if (input != PARTIAL && redundant.all(SHARD_APPLIED) &&
redundant.all(LOCALLY_DEFUNCT) && !redundant.any(LOCALLY_APPLIED))
- return truncate(txnId);
- return NO;
+ return truncate(txnId, min);
+ return min;
}
Invariants.paranoid(redundant.all(SHARD_APPLIED));
- Durability test = Durability.max(durability, durableBefore.min(txnId,
participants.route()));
+
+ if (saveStatus.compareTo(Vestigial) >= 0)
+ {
+ // we can't use durability from an Invalidated record to decide if
we can erase/expunge,
+ // as we don't know that this has been persisted at all shards.
+ // Similarly, vestigial/erased records don't know their etymology,
and may derive from Invalidate
+ durability = NotDurable;
+ }
+
+ Durability test;
+ if (durability.compareTo(UniversalOrInvalidated) >= 0) test =
durability;
+ else test = Durability.max(durability, durableBefore.min(txnId,
participants.route()));
+
switch (test)
{
default: throw new UnhandledEnum(durability);
@@ -187,17 +215,17 @@ public enum Cleanup
// TODO (required): consider how we guarantee not to break
recovery of other shards if a majority on this shard are PRE_BOOTSTRAP
// (if the condition is false and we fall through to
removing Outcome)
if (participants.doesStillExecute())
- return truncateWithOutcome(txnId);
+ return min.atLeast(truncateWithOutcome(txnId, min));
case MajorityOrInvalidated:
case Majority:
- return truncate(txnId);
+ return min.atLeast(truncate(txnId, min));
case UniversalOrInvalidated:
case Universal:
if (redundant.all(GC_BEFORE))
- return erase();
- return truncate(txnId);
+ return erase(txnId, min);
+ return truncate(txnId, min);
}
}
@@ -226,10 +254,7 @@ public enum Cleanup
private static Cleanup cleanupIfUndecidedWithFullRoute(Input input, TxnId
txnId, SaveStatus saveStatus, RedundantStatus redundantStatus)
{
- if (saveStatus.hasBeen(PreCommitted))
- return null;
-
- if (input == PARTIAL)
+ if (input == PARTIAL || saveStatus.hasBeen(PreCommitted))
return NO;
return cleanupUndecided(txnId, redundantStatus);
@@ -258,6 +283,8 @@ public enum Cleanup
if (!requiresUniqueHlcs() || !txnId.is(Write)) return true;
if (saveStatus == null || !saveStatus.known.is(ApplyAtKnown)) return
true;
+ // note, it is safe to use ApplyAtKnown even with PARTIAL input here,
because we are only discarding information,
+ // and we can safely discard any stale executeAt
if (executeAt == null) return true;
if (minGcBefore.is(HLC_BOUND) && executeAt.uniqueHlc() <
minGcBefore.hlc()) return true;
if (participants == null)
@@ -276,18 +303,29 @@ public enum Cleanup
{
return INVALIDATE;
}
- private static Cleanup truncateWithOutcome(TxnId txnId)
+
+ private static Cleanup truncateWithOutcome(TxnId txnId, Cleanup atLeast)
{
- return TRUNCATE_WITH_OUTCOME;
+ return atLeast.compareTo(TRUNCATE_WITH_OUTCOME) > 0 ? atLeast :
TRUNCATE_WITH_OUTCOME;
}
- private static Cleanup truncate(TxnId txnId)
+
+ private static Cleanup truncate(TxnId txnId, Cleanup atLeast)
{
- return TRUNCATE;
+ return atLeast.compareTo(TRUNCATE) > 0 ? atLeast : TRUNCATE;
}
+
private static Cleanup vestigial(TxnId txnId)
{
return VESTIGIAL;
}
- private static Cleanup erase() { return ERASE; }
- private static Cleanup expunge() { return EXPUNGE; }
+
+ private static Cleanup erase(TxnId txnId, Cleanup atLeast)
+ {
+ return atLeast != EXPUNGE ? ERASE : EXPUNGE;
+ }
+
+ private static Cleanup expunge(TxnId txnId)
+ {
+ return EXPUNGE;
+ }
}
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index f5ee9835..36647fc1 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -701,8 +701,8 @@ public abstract class CommandStore implements AgentExecutor
protected void updatedRedundantBefore(SafeCommandStore safeStore, TxnId
syncId, Ranges ranges)
{
TxnId clearWaitingBefore =
redundantBefore.minShardAndLocallyAppliedBefore();
- TxnId clearAnyBefore = durableBefore().min.majorityBefore;
- progressLog.clearBefore(safeStore, clearWaitingBefore, clearAnyBefore);
+ TxnId clearAllBefore = TxnId.min(clearWaitingBefore,
durableBefore().min.majorityBefore);
+ progressLog.clearBefore(safeStore, clearWaitingBefore, clearAllBefore);
listeners.clearBefore(this, clearWaitingBefore);
}
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index 8817a787..a1abe351 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -925,7 +925,7 @@ public class Commands
case TRUNCATE_WITH_OUTCOME:
Invariants.requireArgument(command.known().is(Apply));
Invariants.requireArgument(command.known().is(ApplyAtKnown));
- result = truncated(command, newParticipants,
cleanup.appliesIfNot);
+ result = truncated(command, newParticipants,
cleanup.newStatus);
break;
case TRUNCATE:
@@ -938,7 +938,8 @@ public class Commands
break;
case ERASE:
- Invariants.require(command.saveStatus().compareTo(Erased) < 0);
+ Invariants.require(command.saveStatus() != Erased);
+
case EXPUNGE:
result = erased(command, newParticipants);
break;
@@ -983,7 +984,7 @@ public class Commands
return true;
}
- Invariants.require(cleanup == EXPUNGE ||
command.saveStatus().compareTo(cleanup.appliesIfNot) < 0);
+ Invariants.require(cleanup.appliesTo(command.saveStatus()));
purge(safeStore, safeCommand, command, cleanupParticipants, cleanup,
true);
return true;
}
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 492dddc2..7efe5509 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -557,7 +557,7 @@ public class RedundantBefore extends
ReducingRangeMap<RedundantBefore.Bounds>
// TODO (required): audit each of these methods
if (bounds.is(txnId, SHARD_APPLIED)
- && ((bounds.endEpoch <= txnId.epoch() &&
(!txnId.awaitsPreviouslyOwned() || bounds.isLocallyRetired()))
+ && ((bounds.endEpoch <= txnId.epoch() &&
bounds.isLocallyRetired())
|| (executeAtIfKnown != null && executeAtIfKnown.epoch() <
bounds.startEpoch)
|| bounds.is(txnId, PRE_BOOTSTRAP_OR_STALE)))
return execute.without(Ranges.of(bounds.range));
@@ -579,17 +579,28 @@ public class RedundantBefore extends
ReducingRangeMap<RedundantBefore.Bounds>
return ownedOrNotRedundant;
}
- static Participants<?> withoutShardAppliedBefore(Bounds bounds,
@Nonnull Participants<?> notShardApplied, TxnId txnId)
+ static Participants<?> withoutShardApplied(Bounds bounds, @Nonnull
Participants<?> notShardApplied, TxnId txnId)
{
if (bounds == null)
return notShardApplied;
- if (txnId.compareTo(bounds.maxBound(SHARD_APPLIED)) <= 0)
+ if (bounds.is(txnId, SHARD_APPLIED))
return notShardApplied.without(Ranges.of(bounds.range));
return notShardApplied;
}
+ static Participants<?> withoutShardAppliedLocallySynced(Bounds bounds,
@Nonnull Participants<?> notShardAppliedLocallyDefunct, TxnId txnId)
+ {
+ if (bounds == null)
+ return notShardAppliedLocallyDefunct;
+
+ if (bounds.is(txnId, SHARD_APPLIED, LOCALLY_SYNCED))
+ return
notShardAppliedLocallyDefunct.without(Ranges.of(bounds.range));
+
+ return notShardAppliedLocallyDefunct;
+ }
+
static Ranges withoutBeforeGc(Bounds entry, @Nonnull Ranges
notGarbage, TxnId txnId, @Nullable Timestamp executeAt)
{
if (entry == null || (executeAt == null ? entry.outOfBounds(txnId)
: entry.outOfBounds(txnId, executeAt)))
@@ -1077,7 +1088,17 @@ public class RedundantBefore extends
ReducingRangeMap<RedundantBefore.Bounds>
{
if (participants.isEmpty() || maxShardAppliedBefore.compareTo(txnId) <
0)
return participants;
- return foldl(participants, Bounds::withoutShardAppliedBefore,
participants, txnId);
+ return foldl(participants, Bounds::withoutShardApplied, participants,
txnId);
+ }
+
+ /**
+ * Subtract any ranges we consider stale, pre-bootstrap, or that were
previously owned and have been retired
+ */
+ public Participants<?> withoutShardAppliedLocallySynced(TxnId txnId,
Participants<?> participants)
+ {
+ if (participants.isEmpty() || maxShardAppliedBefore.compareTo(txnId) <
0)
+ return participants;
+ return foldl(participants, Bounds::withoutShardAppliedLocallySynced,
participants, txnId);
}
public static class Builder extends AbstractIntervalBuilder<RoutingKey,
Bounds, RedundantBefore>
diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java
b/accord-core/src/main/java/accord/local/StoreParticipants.java
index 67373793..50ea88cb 100644
--- a/accord-core/src/main/java/accord/local/StoreParticipants.java
+++ b/accord-core/src/main/java/accord/local/StoreParticipants.java
@@ -321,7 +321,6 @@ public class StoreParticipants
* Do not invoke this method on a participants we will use to query esp.
e.g. for calculateDeps.
* TODO (desired): create separate Query object that cannot be filtered or
used to update a Command
*/
- static int count = 0;
public StoreParticipants filter(Filter filter, RedundantBefore
redundantBefore, TxnId txnId, @Nullable Timestamp executeAtIfKnown)
{
if (filter == QUERY)
@@ -374,7 +373,9 @@ public class StoreParticipants
Participants<?> extraTouches = touches.without(owns);
if (!extraTouches.isEmpty())
{
- Participants<?> filteredExtra =
redundantBefore.withoutShardApplied(txnId, extraTouches);
+ Participants<?> filteredExtra = txnId.is(ExclusiveSyncPoint)
+ ?
redundantBefore.withoutShardAppliedLocallySynced(txnId, extraTouches)
+ :
redundantBefore.withoutShardApplied(txnId, extraTouches);
if (extraTouches != filteredExtra)
touches = owns.with((Participants)filteredExtra);
}
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index d6b45dfb..c30a140f 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -2002,7 +2002,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
TxnInfo[] newById = removeRedundantById(byId, bounds, newBounds);
int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(),
redundantBefore(newBounds));
- Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 :
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
+ Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 ||
byId[prunedBeforeById].compareTo(newBounds.gcBefore) < 0 :
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
Object[] newLoadingPruned =
Pruning.removeRedundantLoadingPruned(loadingPruned, redundantBefore(newBounds));
long maxUniqueHlc = this.maxUniqueHlc;
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java
b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index eea0bb66..1ead882f 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -24,6 +24,7 @@ import accord.primitives.AbstractRanges;
import accord.primitives.Status.Durability;
import accord.primitives.SyncPoint;
import accord.primitives.TxnId;
+import accord.utils.Invariants;
import accord.utils.MapReduceConsume;
import accord.utils.async.Cancellable;
@@ -42,6 +43,7 @@ public class SetShardDurable extends
AbstractRequest<SimpleReply>
super(exclusiveSyncPoint.syncId);
this.exclusiveSyncPoint = exclusiveSyncPoint;
this.durability = durability;
+
Invariants.require(durability.compareTo(Durability.MajorityOrInvalidated) >= 0);
}
private TxnId syncIdWithFlags()
@@ -52,8 +54,9 @@ public class SetShardDurable extends
AbstractRequest<SimpleReply>
@Override
public Cancellable submit()
{
+
Invariants.require(durability.compareTo(Durability.MajorityOrInvalidated) >= 0);
TxnId syncIdWithFlags = syncIdWithFlags();
- node.markDurable(exclusiveSyncPoint.route.toRanges(), syncIdWithFlags,
syncIdWithFlags)
+ node.markDurable(exclusiveSyncPoint.route.toRanges(), syncIdWithFlags,
durability.compareTo(Durability.UniversalOrInvalidated) >= 0 ? syncIdWithFlags
: TxnId.NONE)
.addCallback((success, fail) -> {
if (fail != null) node.reply(replyTo, replyContext, null, fail);
else node.mapReduceConsumeLocal(this, exclusiveSyncPoint.route,
waitForEpoch(), waitForEpoch(), this);
diff --git a/accord-core/src/main/java/accord/primitives/Known.java
b/accord-core/src/main/java/accord/primitives/Known.java
index 4bea0ae4..4e8b7fd6 100644
--- a/accord-core/src/main/java/accord/primitives/Known.java
+++ b/accord-core/src/main/java/accord/primitives/Known.java
@@ -30,6 +30,7 @@ import accord.utils.UnhandledEnum;
import static accord.primitives.Known.KnownExecuteAt.ApplyAtKnown;
import static accord.primitives.Known.KnownExecuteAt.IS_EXECUTE_AT_KNOWN;
+import static accord.primitives.Known.Outcome.WasApply;
import static accord.primitives.Known.PrivilegedVote.VotePreAccept;
import static accord.primitives.Known.PrivilegedVote.NoVote;
import static accord.primitives.Known.Definition.DefinitionErased;
@@ -535,6 +536,11 @@ public class Known
return Outcome.VALUES[outcomeOrdinal()];
}
+ public boolean isOrWasApply()
+ {
+ return is(Outcome.Apply) || is(WasApply);
+ }
+
public boolean is(Outcome outcome)
{
return outcomeOrdinal() == outcome.ordinal();
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java
b/accord-core/src/main/java/accord/utils/Invariants.java
index 0851c720..fe6cae97 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -380,6 +380,18 @@ public class Invariants
illegalArgument(format(fmt, p1, p2));
}
+ public static void requireArgument(boolean condition, String fmt,
@Nullable Object p1, @Nullable Object p2, @Nullable Object p3)
+ {
+ if (!condition)
+ illegalArgument(format(fmt, p1, p2, p3));
+ }
+
+ public static void requireArgument(boolean condition, String fmt,
@Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object
p4)
+ {
+ if (!condition)
+ illegalArgument(format(fmt, p1, p2, p3, p4));
+ }
+
public static void requireArgument(boolean condition, String fmt,
Object... args)
{
if (!condition)
diff --git a/accord-core/src/test/java/accord/Utils.java
b/accord-core/src/test/java/accord/Utils.java
index c7de395f..a1be0008 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -201,7 +201,7 @@ public class Utils
InMemoryCommandStores.Synchronized::new,
new CoordinationAdapter.DefaultFactory(),
DurableBefore.NOOP_PERSISTER,
- new InMemoryJournal(nodeId, agent, randomSource));
+ new InMemoryJournal(nodeId, randomSource));
awaitUninterruptibly(node.unsafeStart());
return node;
}
diff --git a/accord-core/src/test/java/accord/burn/BurnTestBase.java
b/accord-core/src/test/java/accord/burn/BurnTestBase.java
index 984f782d..35759349 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestBase.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestBase.java
@@ -52,7 +52,6 @@ import java.util.zip.CRC32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Agent;
import accord.api.Journal;
import accord.api.Key;
import accord.api.ProtocolModifiers.Toggles;
@@ -98,7 +97,6 @@ import accord.utils.DefaultRandom;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.RandomSource;
-import accord.utils.TriFunction;
import accord.utils.UnhandledEnum;
import accord.utils.Utils;
import accord.utils.async.AsyncExecutor;
@@ -369,7 +367,7 @@ public class BurnTestBase
f2.get();
}
- public static void burn(RandomSource random, TopologyFactory
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int
prefixCount, int operations, int concurrency, PendingQueue pendingQueue,
TriFunction<Id, Agent, RandomSource, Journal> journalFactory)
+ public static void burn(RandomSource random, TopologyFactory
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int
prefixCount, int operations, int concurrency, PendingQueue pendingQueue,
BiFunction<Node.Id, RandomSource, Journal> journalFactory)
{
List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
Thread.currentThread().setUncaughtExceptionHandler((th, fail) -> {
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 1cd03eeb..6a6a8f9e 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -52,7 +52,6 @@ import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Agent;
import accord.api.Journal;
import accord.api.MessageSink;
import accord.api.RoutingKey;
@@ -616,7 +615,7 @@ public class Cluster
Supplier<RandomSource>
randomSupplier,
Supplier<TimeService>
timeServiceSupplier,
TopologyFactory topologyFactory,
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal,
- Consumer<Map<Id, Node>>
readySignal, TriFunction<Node.Id, Agent, RandomSource, Journal> journalFactory)
+ Consumer<Map<Id, Node>>
readySignal, BiFunction<Node.Id, RandomSource, Journal> journalFactory)
{
Topology topology = topologyFactory.toTopology(nodes);
Map<Id, Node> nodeMap = new LinkedHashMap<>();
@@ -677,7 +676,7 @@ public class Cluster
BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges)
-> configRandomizer.onStale(id, sinceAtLeast, ranges);
AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id,
onStale, timeouts);
executorMap.put(id, nodeExecutor);
- Journal journal = journalFactory.apply(id,
nodeExecutor.agent(), random);
+ Journal journal = journalFactory.apply(id, random);
journalMap.put(id, journal);
BurnTestConfigurationService configService = new
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology,
nodeMap::get, topologyUpdates);
DelayedCommandStores.CacheLoading cacheLoading = new
RandomLoader(random).newLoader(journal);
@@ -687,6 +686,7 @@ public class Cluster
randomSupplier.get(), scheduler,
SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new,
DefaultTimeouts::new,
DefaultProgressLogs::new,
DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending,
cacheLoading), new CoordinationAdapter.DefaultFactory(),
DurableBefore.NOOP_PERSISTER, journal);
+ journal.start(node);
DurabilityService durability = node.durability();
// TODO (desired): randomise
durability.shards().setShardCycleTime(30, SECONDS);
@@ -922,7 +922,7 @@ public class Cluster
{
if (afterCommand.is(Status.Invalidated))
Invariants.require(beforeCommand.hasBeen(Status.Truncated) ||
(!beforeCommand.hasBeen(Status.PreCommitted)
- && Cleanup.shouldCleanup(FULL, s.agent(),
e.getKey(), beforeCommand.executeAtIfKnown(), beforeCommand.saveStatus(),
beforeCommand.durability(), beforeCommand.participants(),
store.unsafeGetRedundantBefore(), store.durableBefore()).compareTo(INVALIDATE)
>= 0));
+ && Cleanup.shouldCleanup(FULL, e.getKey(),
beforeCommand.executeAtIfKnown(), beforeCommand.saveStatus(),
beforeCommand.durability(), afterCommand.participants(),
store.unsafeGetRedundantBefore(), store.durableBefore()).compareTo(INVALIDATE)
>= 0));
continue;
}
if (beforeCommand.hasBeen(Status.Truncated))
@@ -954,7 +954,7 @@ public class Cluster
if (beforeCommand.saveStatus() == SaveStatus.Erased)
continue;
- if (Cleanup.shouldCleanup(FULL, s.agent(),
beforeCommand, store.unsafeGetRedundantBefore(), store.durableBefore()) ==
EXPUNGE)
+ if (Cleanup.shouldCleanup(FULL, beforeCommand,
store.unsafeGetRedundantBefore(), store.durableBefore()) == EXPUNGE)
continue;
if
(store.unsafeGetRedundantBefore().min(beforeCommand.participants().owns(),
RedundantBefore.Bounds::shardAndLocallyRedundantBefore).compareTo(txnId) > 0)
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 00f00094..32303e91 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
+import javax.annotation.Nonnull;
+
import com.google.common.collect.ImmutableSortedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +103,7 @@ import static accord.local.Cleanup.NO;
import static accord.local.Cleanup.TRUNCATE;
import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
import static accord.primitives.SaveStatus.Erased;
-import static accord.primitives.Status.Invalidated;
+import static accord.primitives.SaveStatus.Uninitialised;
import static accord.primitives.Status.Truncated;
import static accord.utils.Invariants.illegalState;
@@ -112,17 +114,22 @@ public class InMemoryJournal implements Journal
private final List<TopologyUpdate> topologyUpdates = new ArrayList<>();
private final Int2ObjectHashMap<FieldUpdates> fieldStates = new
Int2ObjectHashMap<>();
- private final Node.Id id;
- private final Agent agent;
+ private Node node;
+ private Agent agent;
private final RandomSource random;
- public InMemoryJournal(Node.Id id, Agent agent, RandomSource random)
+ public InMemoryJournal(Node.Id id, RandomSource random)
{
- this.id = id;
- this.agent = agent;
this.random = random;
}
+ public Journal start(Node node)
+ {
+ this.node = node;
+ this.agent = node.agent();
+ return this;
+ }
+
@Override
public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
@@ -139,7 +146,7 @@ public class InMemoryJournal implements Journal
if (builder == null)
return null;
- Cleanup cleanup = builder.maybeCleanup(FULL, agent, redundantBefore,
durableBefore);
+ Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore,
durableBefore);
if (cleanup == EXPUNGE)
return null;
return builder.construct(redundantBefore);
@@ -152,7 +159,7 @@ public class InMemoryJournal implements Journal
if (builder == null || builder.isEmpty())
return null;
- Cleanup cleanup = builder.shouldCleanup(FULL, agent, redundantBefore,
durableBefore);
+ Cleanup cleanup = builder.shouldCleanup(FULL, redundantBefore,
durableBefore);
switch (cleanup)
{
case VESTIGIAL:
@@ -184,6 +191,8 @@ public class InMemoryJournal implements Journal
for (int i = saved.size() - 1; i >= 0; i--)
{
Diff diff = saved.get(i);
+ if (diff == null)
+ continue;
if (builder == null)
builder = new Builder(diff.txnId, load);
builder.apply(diff);
@@ -331,83 +340,91 @@ public class InMemoryJournal implements Journal
for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet())
{
List<Diff> diffs = e2.getValue();
- int from;
+
if (diffs.isEmpty()) continue;
List<Diff> subset = diffs;
- if (isPartialCompaction)
+ if (diffs.size() > 1 && isPartialCompaction)
{
- subset = new ArrayList<>();
- int tmp1 = random.nextInt(diffs.size());
- int tmp2 = random.nextInt(diffs.size());
- from = Math.min(tmp1, tmp2);
- int max = Math.max(tmp1, tmp2);
- for (int i = from; i < max; i++)
- subset.add(diffs.get(i));
+ int removeCount = 1 + random.nextInt(diffs.size() - 1);
+ int count = diffs.size();
+ subset = new ArrayList<>(diffs);
+ while (removeCount-- > 0)
+ {
+ int removeIndex = random.nextInt(diffs.size());
+ if (subset.get(removeIndex) == null)
+ continue;
+ subset.set(removeIndex, null);
+ --count;
+ }
+
+ if (count == 0)
+ continue;
}
- InMemoryJournal.Builder builder = reconstruct(subset, ALL);
- if (builder == null || builder.saveStatus() == null)
- continue; // Partial compaction, no save status present
+ Builder[] builders = new Builder[diffs.size()];
+ for (int i = 0 ; i < subset.size() ; ++i)
+ {
+ if (subset.get(i) == null) continue;
+ Builder builder = new Builder(e2.getKey(), ALL);
+ builder.apply(subset.get(i));
+ builders[i] = builder;
+ }
- if (builder.saveStatus().status == Truncated ||
builder.saveStatus().status == Invalidated)
- continue; // Already truncated
+ Builder builder = new Builder(e2.getKey(), ALL);
+ for (int i = builders.length - 1; i >= 0 ; --i)
+ {
+ Builder current = builders[i];
+ if (current == null)
+ continue;
+ current.clearSuperseded(false, builder);
+ builder.fillInMissingOrCleanup(false, current);
+ }
Input input = isPartialCompaction ? PARTIAL : FULL;
++counter;
- Cleanup cleanup = builder.shouldCleanup(input,
store.agent(),store.unsafeGetRedundantBefore(), store.durableBefore());
- cleanup = builder.maybeCleanup(input, cleanup);
+ Cleanup cleanup = builder.shouldCleanup(input,
store.unsafeGetRedundantBefore(), store.durableBefore());
+ cleanup = builder.maybeCleanup(true, input, cleanup);
if (cleanup != NO)
{
if (cleanup == EXPUNGE)
{
- if (input == FULL) e2.setValue(new PurgedList());
+ if (input == FULL || subset == diffs) e2.setValue(new
PurgedList());
else diffs.removeAll(subset);
+ continue;
}
else
{
- int flags =
CommandChange.eraseKnownFieldsMask(cleanup.appliesIfNot);
- EnumMap<Field, Object> values = new
EnumMap<>(Field.class);
- int iterator = toIterableSetFields(~flags) & 0x3FFF;
// limit ourselves to 14 bits
- for (Field field = nextSetField(iterator); field !=
null; iterator = unsetIterable(field, iterator), field = nextSetField(iterator))
- {
- if (field == CLEANUP || field == SAVE_STATUS)
- continue;
- Object v = builder.get(field);
- if (v != null)
- {
- flags = setChanged(field, flags);
- values.put(field, v);
- }
- }
- values.put(SAVE_STATUS, cleanup.appliesIfNot);
- values.put(CLEANUP, cleanup);
- flags = setChanged(SAVE_STATUS, flags);
- flags = setChanged(CLEANUP, flags);
-
- Diff diff = new Diff(builder.txnId(), flags, values);
- // During partial compaction, we can only append_to
the existing list, removing items that got compacted away
if (isPartialCompaction)
{
- int i = 0, j = 0;
- while (i < diffs.size() && j < subset.size())
+ boolean saveCleanup = true;
+ for (int i = builders.length - 1; i >= 0 ; --i)
{
- if (subset.get(j) == diffs.get(i))
- ++j;
- ++i;
+ if (builders[i] != null)
+ {
+ if (saveCleanup)
builders[i].addCleanup(false, cleanup);
+ else builders[i].cleanup(false, cleanup);
+ saveCleanup = false;
+ }
}
-
- List<Diff> newDiffs = new
ArrayList<>(diffs.subList(0, i));
- newDiffs.add(diff);
- newDiffs.addAll(diffs.subList(i, diffs.size()));
- e2.setValue(newDiffs);
}
// During full compaction, we erase all previous
records, replacing them with new image
else
{
+ Diff diff = builder.toDiff();
e2.setValue(cleanup == ERASE ? new
ErasedList(diff) : new TruncatedList(diff));
+ continue;
}
}
}
+
+ for (int i = 0 ; i < builders.length ; ++i)
+ {
+ if (builders[i] != null)
+ {
+ Diff diff = builders[i].toDiff();
+ diffs.set(i, diff.flags == 0 ? null : diff);
+ }
+ }
}
}
}
@@ -440,11 +457,11 @@ public class InMemoryJournal implements Journal
private static class ErasedList extends AbstractList<Diff>
{
- final Diff erased;
+ private Diff erased;
ErasedList(Diff erased)
{
- Invariants.requireArgument(erased.changes.get(SAVE_STATUS) ==
Erased);
+ Invariants.requireArgument(erased.changes.get(SAVE_STATUS) ==
Erased || erased.changes.get(CLEANUP) == ERASE);
this.erased = erased;
}
@@ -466,10 +483,21 @@ public class InMemoryJournal implements Journal
public boolean add(Diff diff)
{
// TODO (expected): we shouldn't really be saving updates (such as
durability updates) to Erased commands
- if (diff.changes.get(SAVE_STATUS) == Erased ||
diff.changes.get(SAVE_STATUS) == null)
+ if (diff.changes.get(SAVE_STATUS) == Erased ||
diff.changes.get(SAVE_STATUS) == null || diff.changes.get(CLEANUP) == ERASE)
return false;
throw illegalState();
}
+
+ @Override
+ public Diff set(int index, Diff diff)
+ {
+ if (diff.changes.get(SAVE_STATUS) == Erased ||
diff.changes.get(CLEANUP) == ERASE)
+ {
+ erased = diff;
+ return erased;
+ }
+ return super.set(index, diff);
+ }
}
static class TruncatedList extends ArrayList<Diff>
@@ -504,13 +532,19 @@ public class InMemoryJournal implements Journal
}
}
- private static Diff toDiff(CommandUpdate update)
+ private static Diff toDiff(@Nonnull CommandUpdate update)
{
- if (update == null
- || update.before == update.after
- || update.after == null
- || update.after.saveStatus() == SaveStatus.Uninitialised)
- return null;
+ if (update.before == null)
+ {
+ if (update.after.saveStatus() == Uninitialised)
+ return null;
+ }
+ else
+ {
+ Invariants.require(update.after.saveStatus() != Uninitialised);
+ if (update.before.saveStatus() == Erased)
+ return null;
+ }
int flags = validateFlags(getFlags(update.before, update.after));
if (!anyFieldChanged(flags))
@@ -650,11 +684,35 @@ public class InMemoryJournal implements Journal
return executeAt;
}
+ Diff toDiff()
+ {
+ int flags = this.flags;
+ EnumMap<Field, Object> values = new EnumMap<>(Field.class);
+
+ int iterator = toIterableSetFields(notNulls(flags)); // limit
ourselves to 14 bits
+ for (Field field = nextSetField(iterator); field != null; iterator
= unsetIterable(field, iterator), field = nextSetField(iterator))
+ {
+ if (field == CLEANUP)
+ continue;
+ Object v = Invariants.nonNull(get(field));
+ values.put(field, v);
+ }
+
+ if (cleanup != null)
+ {
+ flags |= CommandChange.eraseKnownFieldsMask(cleanup.newStatus);
+ values.put(CLEANUP, cleanup);
+ flags = setChanged(CLEANUP, flags);
+ }
+
+ return new Diff(txnId(), flags, values);
+ }
+
private void apply(Diff diff)
{
Invariants.require(diff.txnId != null);
Invariants.require(diff.flags != 0);
- nextCalled = true;
+ hasUpdate = true;
count++;
int iterable = toIterableSetFields(diff.flags);
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 1469bc47..3df406ec 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -31,6 +31,7 @@ import accord.api.Journal;
import accord.local.Command;
import accord.local.CommandStores;
import accord.local.DurableBefore;
+import accord.local.Node;
import accord.local.RedundantBefore;
import accord.primitives.EpochSupplier;
import accord.primitives.Ranges;
@@ -73,6 +74,13 @@ public class LoggingJournal implements Journal
}
}
+ @Override
+
+ public Journal start(Node node)
+ {
+ return this;
+ }
+
@Override
public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 28d7dfac..53b0117a 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -133,7 +133,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
MessageSink messageSink = messageSinkFactory.apply(id, this);
MockConfigurationService configurationService = new
MockConfigurationService(messageSink, onFetchTopology, topology);
Agent agent = new TestAgent();
- Journal journal = new InMemoryJournal(id, agent, random.fork());
+ Journal journal = new InMemoryJournal(id, random.fork());
ThreadPoolScheduler scheduler = new ThreadPoolScheduler();
Node node = new Node(id,
messageSink,
@@ -153,6 +153,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
new CoordinationAdapter.DefaultFactory(),
DurableBefore.NOOP_PERSISTER,
journal);
+ journal.start(node);
awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(topology, false, true);
return node;
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index ec7b63a5..34cd2463 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -112,7 +112,7 @@ public class ImmutableCommandTest
InMemoryCommandStores.Synchronized::new,
new CoordinationAdapter.DefaultFactory(),
DurableBefore.NOOP_PERSISTER,
- new InMemoryJournal(id, agent, random.fork()));
+ new InMemoryJournal(id, random.fork()));
awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(storeSupport.local.get(), false, true);
return node;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 01161363..ffc0d3ef 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -388,6 +388,7 @@ public class Cluster implements Scheduler
public static class NoOpJournal implements Journal
{
+ @Override public Journal start(Node node) { return null; }
@Override public Command loadCommand(int store, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new
IllegalStateException("Not impelemented"); }
@Override public Command.Minimal loadMinimal(int store, TxnId txnId,
Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) {
throw new IllegalStateException("Not impelemented"); }
@Override public void saveCommand(int store, CommandUpdate value,
Runnable onFlush) { throw new IllegalStateException("Not impelemented"); }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]