This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new f7b9bb88 Migrate in memory journal to CommandChange logic shared with
AccordJournal
f7b9bb88 is described below
commit f7b9bb8887ed672185f269ebcbc9d11e6aeafca9
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Nov 26 15:25:32 2024 +0100
Migrate in memory journal to CommandChange logic shared with AccordJournal
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20115
---
accord-core/src/main/java/accord/api/Journal.java | 12 +
.../src/main/java/accord/impl/CommandChange.java | 621 +++++++++++++++++++
.../src/main/java/accord/local/Command.java | 58 +-
.../src/test/java/accord/burn/BurnTestBase.java | 4 +-
.../src/test/java/accord/impl/basic/Cluster.java | 5 +-
.../java/accord/impl/basic/InMemoryJournal.java | 661 ++++++++++-----------
.../java/accord/impl/basic/LoggingJournal.java | 21 +
.../java/accord/impl/basic/VerifyingJournal.java | 110 ----
8 files changed, 1006 insertions(+), 486 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Journal.java
b/accord-core/src/main/java/accord/api/Journal.java
index 300fc274..2fd70165 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -29,6 +29,7 @@ import accord.local.RedundantBefore;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.utils.PersistentField.Persister;
/**
* Persisted journal for transactional recovery.
@@ -36,6 +37,8 @@ import accord.primitives.TxnId;
public interface Journal
{
Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore
redundantBefore, DurableBefore durableBefore);
+ Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load load,
RedundantBefore redundantBefore, DurableBefore durableBefore);
+
// TODO (required): use OnDone instead of Runnable
void saveCommand(int store, CommandUpdate value, Runnable onFlush);
@@ -47,6 +50,8 @@ public interface Journal
NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId);
CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId);
+ Persister<DurableBefore, DurableBefore> durableBeforePersister();
+
void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush);
class CommandUpdate
@@ -82,6 +87,13 @@ public interface Journal
}
}
+ enum Load
+ {
+ ALL,
+ PURGEABLE,
+ MINIMAL
+ }
+
/**
* Helper for CommandStore to restore Command states.
*/
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java
b/accord-core/src/main/java/accord/impl/CommandChange.java
new file mode 100644
index 00000000..e2c450d9
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -0,0 +1,621 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.function.Function;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.Agent;
+import accord.api.Result;
+import accord.local.Cleanup;
+import accord.local.Command;
+import accord.local.CommonAttributes;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.local.StoreParticipants;
+import accord.primitives.Ballot;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.SaveStatus;
+import accord.primitives.Status;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.utils.Invariants;
+
+import static accord.api.Journal.Load;
+import static accord.api.Journal.Load.ALL;
+import static accord.impl.CommandChange.Fields.ACCEPTED;
+import static accord.impl.CommandChange.Fields.CLEANUP;
+import static accord.impl.CommandChange.Fields.DURABILITY;
+import static accord.impl.CommandChange.Fields.EXECUTES_AT_LEAST;
+import static accord.impl.CommandChange.Fields.EXECUTE_AT;
+import static accord.impl.CommandChange.Fields.FIELDS;
+import static accord.impl.CommandChange.Fields.PARTIAL_DEPS;
+import static accord.impl.CommandChange.Fields.PARTIAL_TXN;
+import static accord.impl.CommandChange.Fields.PARTICIPANTS;
+import static accord.impl.CommandChange.Fields.PROMISED;
+import static accord.impl.CommandChange.Fields.RESULT;
+import static accord.impl.CommandChange.Fields.SAVE_STATUS;
+import static accord.impl.CommandChange.Fields.WAITING_ON;
+import static accord.impl.CommandChange.Fields.WRITES;
+import static accord.local.Cleanup.NO;
+import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
+import static accord.primitives.Known.KnownDeps.DepsErased;
+import static accord.primitives.Known.KnownDeps.DepsUnknown;
+import static accord.primitives.Known.KnownDeps.NoDeps;
+import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
+import static accord.primitives.Status.Durability.NotDurable;
+import static accord.utils.Invariants.illegalState;
+
+public class CommandChange
+{
+ // This enum is order-dependent
+ public enum Fields
+ {
+ PARTICIPANTS, // stored first so we can index it
+ SAVE_STATUS,
+ PARTIAL_DEPS,
+ EXECUTE_AT,
+ EXECUTES_AT_LEAST,
+ DURABILITY,
+ ACCEPTED,
+ PROMISED,
+ WAITING_ON,
+ PARTIAL_TXN,
+ WRITES,
+ CLEANUP,
+ RESULT,
+ ;
+
+ public static final Fields[] FIELDS = values();
+ }
+
+ public static class Builder
+ {
+ protected final int mask;
+ protected int flags;
+
+ protected TxnId txnId;
+
+ protected Timestamp executeAt;
+ protected Timestamp executeAtLeast;
+ protected SaveStatus saveStatus;
+ protected Status.Durability durability;
+
+ protected Ballot acceptedOrCommitted;
+ protected Ballot promised;
+
+ protected StoreParticipants participants;
+ protected PartialTxn partialTxn;
+ protected PartialDeps partialDeps;
+
+ protected byte[] waitingOnBytes;
+ protected CommandChange.WaitingOnProvider waitingOn;
+ protected Writes writes;
+ protected Result result;
+ protected Cleanup cleanup;
+
+ protected boolean nextCalled;
+ protected int count;
+
+ public Builder(TxnId txnId, Load load)
+ {
+ this.mask = mask(load);
+ init(txnId);
+ }
+
+ public Builder(TxnId txnId)
+ {
+ this(txnId, ALL);
+ }
+
+ public Builder(Load load)
+ {
+ this.mask = mask(load);
+ }
+
+ public Builder()
+ {
+ this(ALL);
+ }
+
+ public TxnId txnId()
+ {
+ return txnId;
+ }
+
+ public Timestamp executeAt()
+ {
+ return executeAt;
+ }
+
+ // TODO: why is this unused in BurnTest
+ public Timestamp executeAtLeast()
+ {
+ return executeAtLeast;
+ }
+
+ public SaveStatus saveStatus()
+ {
+ return saveStatus;
+ }
+
+ public Status.Durability durability()
+ {
+ return durability;
+ }
+
+ public Ballot acceptedOrCommitted()
+ {
+ return acceptedOrCommitted;
+ }
+
+ public Ballot promised()
+ {
+ return promised;
+ }
+
+ public StoreParticipants participants()
+ {
+ return participants;
+ }
+
+ public PartialTxn partialTxn()
+ {
+ return partialTxn;
+ }
+
+ public PartialDeps partialDeps()
+ {
+ return partialDeps;
+ }
+
+ public CommandChange.WaitingOnProvider waitingOn()
+ {
+ return waitingOn;
+ }
+
+ public Writes writes()
+ {
+ return writes;
+ }
+
+ public Result result()
+ {
+ return result;
+ }
+
+ public void clear()
+ {
+ flags = 0;
+ txnId = null;
+
+ executeAt = null;
+ executeAtLeast = null;
+ saveStatus = null;
+ durability = null;
+
+ acceptedOrCommitted = null;
+ promised = null;
+
+ participants = null;
+ partialTxn = null;
+ partialDeps = null;
+
+ waitingOnBytes = null;
+ waitingOn = null;
+ writes = null;
+ result = null;
+ cleanup = null;
+
+ nextCalled = false;
+ count = 0;
+ }
+
+ public void reset(TxnId txnId)
+ {
+ clear();
+ init(txnId);
+ }
+
+ public void init(TxnId txnId)
+ {
+ this.txnId = txnId;
+ durability = NotDurable;
+ acceptedOrCommitted = promised = Ballot.ZERO;
+ waitingOn = (txn, deps) -> null;
+ result = null;
+ }
+
+ public boolean isEmpty()
+ {
+ return !nextCalled;
+ }
+
+ public int count()
+ {
+ return count;
+ }
+
+ public Cleanup shouldCleanup(Agent agent, RedundantBefore
redundantBefore, DurableBefore durableBefore)
+ {
+ if (!nextCalled)
+ return NO;
+
+ if (saveStatus == null || participants == null)
+ return Cleanup.NO;
+
+ Cleanup cleanup = Cleanup.shouldCleanupPartial(agent, txnId,
saveStatus, durability, participants, redundantBefore, durableBefore);
+ if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
+ cleanup = this.cleanup;
+ return cleanup;
+ }
+
+ public Builder maybeCleanup(Cleanup cleanup)
+ {
+ if (saveStatus() == null)
+ return this;
+
+ switch (cleanup)
+ {
+ case EXPUNGE:
+ case ERASE:
+ return null;
+
+ case EXPUNGE_PARTIAL:
+ return expungePartial(cleanup, saveStatus, true);
+
+ case VESTIGIAL:
+ case INVALIDATE:
+ return saveStatusOnly();
+
+ case TRUNCATE_WITH_OUTCOME:
+ case TRUNCATE:
+ return expungePartial(cleanup, cleanup.appliesIfNot,
cleanup == TRUNCATE_WITH_OUTCOME);
+
+ case NO:
+ return this;
+ default:
+ throw new IllegalStateException("Unknown cleanup: " +
cleanup);}
+ }
+
+ public Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus,
boolean includeOutcome)
+ {
+ Invariants.checkState(txnId != null);
+ Builder builder = new Builder(txnId, ALL);
+
+ builder.count++;
+ builder.nextCalled = true;
+
+ Invariants.checkState(saveStatus != null);
+ builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
+ builder.saveStatus = saveStatus;
+ builder.flags = setFieldChanged(CLEANUP, builder.flags);
+ builder.cleanup = cleanup;
+ if (executeAt != null)
+ {
+ builder.flags = setFieldChanged(EXECUTE_AT, builder.flags);
+ builder.executeAt = executeAt;
+ }
+ if (durability != null)
+ {
+ builder.flags = setFieldChanged(DURABILITY, builder.flags);
+ builder.durability = durability;
+ }
+ if (participants != null)
+ {
+ builder.flags = setFieldChanged(PARTICIPANTS, builder.flags);
+ builder.participants = participants;
+ }
+ if (includeOutcome && builder.writes != null)
+ {
+ builder.flags = setFieldChanged(WRITES, builder.flags);
+ builder.writes = writes;
+ }
+
+ return builder;
+ }
+
+ public Builder saveStatusOnly()
+ {
+ Invariants.checkState(txnId != null);
+ Builder builder = new Builder(txnId, ALL);
+
+ builder.count++;
+ builder.nextCalled = true;
+
+ if (saveStatus != null)
+ {
+ builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
+ builder.saveStatus = saveStatus;
+ }
+
+ return builder;
+ }
+
+ public Command.Minimal asMinimal()
+ {
+ return new Command.Minimal(txnId, saveStatus, participants,
durability, executeAt, writes);
+ }
+
+ public void forceResult(Result newValue)
+ {
+ this.result = newValue;
+ }
+
+ public Command construct(RedundantBefore redundantBefore)
+ {
+ if (!nextCalled)
+ return null;
+
+ Invariants.checkState(txnId != null);
+ CommonAttributes.Mutable attrs = new
CommonAttributes.Mutable(txnId);
+ if (partialTxn != null)
+ attrs.partialTxn(partialTxn);
+ if (durability != null)
+ attrs.durability(durability);
+ if (participants != null)
+
attrs.setParticipants(participants.filter(StoreParticipants.Filter.LOAD,
redundantBefore, txnId, saveStatus.known.executeAt.isDecidedAndKnownToExecute()
? executeAt : null));
+ if (participants != null)
+ attrs.setParticipants(participants);
+ if (partialDeps != null &&
+ (saveStatus.known.deps != NoDeps &&
+ saveStatus.known.deps != DepsErased &&
+ saveStatus.known.deps != DepsUnknown))
+ attrs.partialDeps(partialDeps);
+
+ switch (saveStatus.known.outcome)
+ {
+ case Erased:
+ case WasApply:
+ writes = null;
+ result = null;
+ break;
+ }
+
+ Command.WaitingOn waitingOn = null;
+ if (this.waitingOn != null)
+ waitingOn = this.waitingOn.provide(txnId, partialDeps);
+
+ switch (saveStatus.status)
+ {
+ case NotDefined:
+ return saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
+ :
Command.NotDefined.notDefined(attrs, promised);
+ case PreAccepted:
+ return Command.PreAccepted.preAccepted(attrs, executeAt,
promised);
+ case AcceptedInvalidate:
+ case Accepted:
+ case PreCommitted:
+ if (saveStatus == SaveStatus.AcceptedInvalidate)
+ return
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised,
acceptedOrCommitted);
+ else
+ return Command.Accepted.accepted(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted);
+ case Committed:
+ case Stable:
+ return Command.Committed.committed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn);
+ case PreApplied:
+ case Applied:
+ return Command.Executed.executed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
+ case Truncated:
+ case Invalidated:
+ return truncated(attrs, saveStatus, executeAt,
executeAtLeast, writes, result);
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private static Command.Truncated truncated(CommonAttributes.Mutable
attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast,
Writes writes, Result result)
+ {
+ switch (status)
+ {
+ default:
+ throw illegalState("Unhandled SaveStatus: " + status);
+ case TruncatedApplyWithOutcome:
+ case TruncatedApplyWithDeps:
+ case TruncatedApply:
+ if (status != TruncatedApplyWithOutcome)
+ result = null;
+ if (attrs.txnId().kind().awaitsOnlyDeps())
+ return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result, executesAtLeast);
+ return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result, null);
+ case ErasedOrVestigial:
+ return Command.Truncated.erasedOrVestigial(attrs.txnId(),
attrs.participants());
+ case Erased:
+ // TODO (expected): why are we saving Durability here for
erased commands?
+ return Command.Truncated.erased(attrs.txnId(),
attrs.durability(), attrs.participants());
+ case Invalidated:
+ return Command.Truncated.invalidated(attrs.txnId());
+ }
+ }
+
+ public String toString()
+ {
+ return "Builder {" +
+ "txnId=" + txnId +
+ ", executeAt=" + executeAt +
+ ", saveStatus=" + saveStatus +
+ ", durability=" + durability +
+ ", acceptedOrCommitted=" + acceptedOrCommitted +
+ ", promised=" + promised +
+ ", participants=" + participants +
+ ", partialTxn=" + partialTxn +
+ ", partialDeps=" + partialDeps +
+ ", waitingOn=" + waitingOn +
+ ", writes=" + writes +
+ '}';
+ }
+ }
+
+ /**
+ * Helpers
+ */
+
+ public interface WaitingOnProvider
+ {
+ Command.WaitingOn provide(TxnId txnId, PartialDeps deps);
+ }
+
+ public static Command.WaitingOn getWaitingOn(Command command)
+ {
+ if (command instanceof Command.Committed)
+ return command.asCommitted().waitingOn();
+
+ return null;
+ }
+
+ /**
+ * Managing masks
+ */
+
+ public static int mask(Fields... fields)
+ {
+ int mask = -1;
+ for (Fields field : fields)
+ mask &= ~(1 << field.ordinal());
+ return mask;
+ }
+
+ private static final int[] LOAD_MASKS = new int[] {0,
+ mask(SAVE_STATUS,
PARTICIPANTS, DURABILITY, EXECUTE_AT, WRITES),
+ mask(SAVE_STATUS,
PARTICIPANTS, EXECUTE_AT)};
+
+ public static int mask(Load load)
+ {
+ return LOAD_MASKS[load.ordinal()];
+ }
+
+ /**
+ * Managing flags
+ */
+
+ @VisibleForTesting
+ public static int getFlags(Command before, Command after)
+ {
+ int flags = 0;
+
+ flags = collectFlags(before, after, Command::executeAt, true,
EXECUTE_AT, flags);
+ flags = collectFlags(before, after, Command::executesAtLeast, true,
EXECUTES_AT_LEAST, flags);
+ flags = collectFlags(before, after, Command::saveStatus, false,
SAVE_STATUS, flags);
+ flags = collectFlags(before, after, Command::durability, false,
DURABILITY, flags);
+
+ flags = collectFlags(before, after, Command::acceptedOrCommitted,
false, ACCEPTED, flags);
+ flags = collectFlags(before, after, Command::promised, false,
PROMISED, flags);
+
+ flags = collectFlags(before, after, Command::participants, true,
PARTICIPANTS, flags);
+ flags = collectFlags(before, after, Command::partialTxn, false,
PARTIAL_TXN, flags);
+ flags = collectFlags(before, after, Command::partialDeps, false,
PARTIAL_DEPS, flags);
+
+ // TODO: waitingOn vs WaitingOnWithExecutedAt?
+ flags = collectFlags(before, after, CommandChange::getWaitingOn, true,
WAITING_ON, flags);
+
+ flags = collectFlags(before, after, Command::writes, false, WRITES,
flags);
+
+ // Special-cased for Journal BurnTest integration
+ if ((before != null && after.result() != before.result()) ||
+ (before == null && after.result() != null)) //TODO
+ {
+ flags = collectFlags(before, after, Command::writes, false,
RESULT, flags);
+ }
+
+ return flags;
+ }
+
+ private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ,
VAL> convert, boolean allowClassMismatch, Fields field, int flags)
+ {
+ VAL l = null;
+ VAL r = null;
+ if (lo != null) l = convert.apply(lo);
+ if (ro != null) r = convert.apply(ro);
+
+ if (l == r)
+ return flags; // no change
+
+ if (r == null)
+ flags = setFieldIsNull(field, flags);
+
+ if (l == null || r == null)
+ return setFieldChanged(field, flags);
+
+ assert allowClassMismatch || l.getClass() == r.getClass() :
String.format("%s != %s", l.getClass(), r.getClass());
+
+ if (l.equals(r))
+ return flags; // no change
+
+ return setFieldChanged(field, flags);
+ }
+
+ // TODO (required): calculate flags once
+ public static boolean anyFieldChanged(int flags)
+ {
+ return (flags >>> 16) != 0;
+ }
+
+ public static int validateFlags(int flags)
+ {
+ Invariants.checkState(0 == (~(flags >>> 16) & (flags & 0xffff)));
+ return flags;
+ }
+
+ public static int setFieldChanged(Fields field, int oldFlags)
+ {
+ return oldFlags | (0x10000 << field.ordinal());
+ }
+
+ @VisibleForTesting
+ public static boolean getFieldChanged(Fields field, int oldFlags)
+ {
+ return (oldFlags & (0x10000 << field.ordinal())) != 0;
+ }
+
+ public static int toIterableSetFields(int flags)
+ {
+ return flags >>> 16;
+ }
+
+ public static Fields nextSetField(int iterable)
+ {
+ int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable));
+ return i == 32 ? null : FIELDS[i];
+ }
+
+ public static int unsetIterableFields(Fields field, int iterable)
+ {
+ return iterable & ~(1 << field.ordinal());
+ }
+
+ @VisibleForTesting
+ public static boolean getFieldIsNull(Fields field, int oldFlags)
+ {
+ return (oldFlags & (1 << field.ordinal())) != 0;
+ }
+
+ public static int unsetFieldIsNull(Fields field, int oldFlags)
+ {
+ return oldFlags & ~(1 << field.ordinal());
+ }
+
+ public static int setFieldIsNull(Fields field, int oldFlags)
+ {
+ return oldFlags | (1 << field.ordinal());
+ }
+
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index 14274670..62a85cf9 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -22,19 +22,17 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.api.Result;
import accord.api.RoutingKey;
import accord.api.VisibleForImplementation;
-import accord.primitives.SaveStatus;
-import accord.primitives.Status;
-import accord.primitives.Status.Durability;
-import accord.primitives.Known;
import accord.local.cfk.CommandsForKey;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.KeyDeps;
+import accord.primitives.Known;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
@@ -43,6 +41,9 @@ import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.Route;
import accord.primitives.RoutingKeys;
+import accord.primitives.SaveStatus;
+import accord.primitives.Status;
+import accord.primitives.Status.Durability;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -54,12 +55,13 @@ import accord.utils.IndexedTriConsumer;
import accord.utils.Invariants;
import accord.utils.SimpleBitSet;
-import javax.annotation.Nonnull;
-
import com.google.common.annotations.VisibleForTesting;
import static accord.local.Command.AbstractCommand.validate;
+import static accord.primitives.Known.KnownExecuteAt.ExecuteAtKnown;
import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Routable.Domain.Range;
+import static accord.primitives.Routables.Slice;
import static accord.primitives.SaveStatus.AcceptedInvalidate;
import static accord.primitives.SaveStatus.ErasedOrVestigial;
import static accord.primitives.SaveStatus.Uninitialised;
@@ -68,10 +70,7 @@ import static accord.primitives.Status.Durability.NotDurable;
import static accord.primitives.Status.Durability.ShardUniversal;
import static accord.primitives.Status.Durability.UniversalOrInvalidated;
import static accord.primitives.Status.Invalidated;
-import static accord.primitives.Known.KnownExecuteAt.ExecuteAtKnown;
import static accord.primitives.Status.Stable;
-import static accord.primitives.Routable.Domain.Range;
-import static accord.primitives.Routables.Slice.Minimal;
import static accord.utils.Invariants.Paranoia.LINEAR;
import static accord.utils.Invariants.Paranoia.NONE;
import static accord.utils.Invariants.ParanoiaCostFactor.LOW;
@@ -284,7 +283,7 @@ public abstract class Command implements CommonAttributes
case DefinitionErased:
case DefinitionUnknown:
case NoOp:
- Invariants.checkState(partialTxn == null, "partialTxn
is defined");
+ Invariants.checkState(partialTxn == null, "partialTxn
is defined %s", validate);
break;
case DefinitionKnown:
Invariants.checkState(partialTxn != null ||
validate.participants().owns().isEmpty(), "partialTxn is null");
@@ -1263,6 +1262,41 @@ public abstract class Command implements CommonAttributes
}
}
+ public static class Minimal
+ {
+ public final TxnId txnId;
+ public final SaveStatus saveStatus;
+ public final StoreParticipants participants;
+ public final Status.Durability durability;
+ public final Timestamp executeAt;
+ public final Writes writes;
+
+ public Minimal(TxnId txnId, SaveStatus saveStatus, StoreParticipants
participants, Status.Durability durability, Timestamp executeAt, Writes writes)
+ {
+ this.txnId = txnId;
+ this.saveStatus = saveStatus;
+ this.participants = participants;
+ this.durability = durability;
+ this.executeAt = executeAt;
+ this.writes = writes;
+ }
+
+ @Override
+ public boolean equals(Object object)
+ {
+ if (this == object) return true;
+ if (object == null || getClass() != object.getClass()) return
false;
+ Minimal minimal = (Minimal) object;
+ return Objects.equals(txnId, minimal.txnId) && saveStatus ==
minimal.saveStatus && Objects.equals(participants, minimal.participants) &&
durability == minimal.durability && Objects.equals(executeAt,
minimal.executeAt) && Objects.equals(writes, minimal.writes);
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
public static class WaitingOn
{
private static final WaitingOn EMPTY_FOR_KEY = new
WaitingOn(RoutingKeys.EMPTY, RangeDeps.NONE, KeyDeps.NONE,
ImmutableBitSet.EMPTY, null);
@@ -1495,7 +1529,7 @@ public abstract class Command implements CommonAttributes
ranges =
safeStore.redundantBefore().removePreBootstrap(txnId, ranges);
if (!ranges.isEmpty())
{
-
deps.rangeDeps.forEach(participants.stillExecutes().slice(ranges, Minimal),
update, (upd, idx) -> {
+
deps.rangeDeps.forEach(participants.stillExecutes().slice(ranges,
Slice.Minimal), update, (upd, idx) -> {
TxnId id = upd.txnId(idx);
// because we use RX as RedundantBefore
bounds, we must not let an RX on a closing range
// get ahead of one that isn't closed but has
overlapping transactions (else we may erroneously treat as redundant)
@@ -1528,7 +1562,7 @@ public abstract class Command implements CommonAttributes
@VisibleForTesting
public static Update unsafeInitialise(TxnId txnId, Ranges
executeRanges, Route<?> route, Deps deps)
{
- Unseekables<?> executionParticipants =
route.slice(executeRanges, Minimal);
+ Unseekables<?> executionParticipants =
route.slice(executeRanges, Slice.Minimal);
Update update = new Update(txnId, deps.keyDeps.keys(),
deps.rangeDeps, deps.directKeyDeps);
// TODO (expected): refactor this to operate only on
participants, not ranges
deps.rangeDeps.forEach(executionParticipants, update,
Update::initialise);
diff --git a/accord-core/src/test/java/accord/burn/BurnTestBase.java
b/accord-core/src/test/java/accord/burn/BurnTestBase.java
index 5ffa596e..2b04755b 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestBase.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestBase.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
@@ -50,6 +51,7 @@ import java.util.zip.CRC32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.Agent;
import accord.api.Journal;
import accord.api.Key;
import accord.burn.random.FrequentLargeRange;
@@ -301,7 +303,7 @@ public class BurnTestBase
f2.get();
}
- public static void burn(RandomSource random, TopologyFactory
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int
prefixCount, int operations, int concurrency, PendingQueue pendingQueue,
Function<Id, Journal> journalFactory)
+ public static void burn(RandomSource random, TopologyFactory
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int
prefixCount, int operations, int concurrency, PendingQueue pendingQueue,
BiFunction<Id, Agent, Journal> journalFactory)
{
List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
AtomicLong progress = new AtomicLong();
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 63c75e74..bc5c4247 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -52,6 +52,7 @@ import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.Agent;
import accord.api.BarrierType;
import accord.api.Journal;
import accord.api.LocalConfig;
@@ -607,7 +608,7 @@ public class Cluster
Supplier<RandomSource>
randomSupplier,
Supplier<TimeService>
timeServiceSupplier,
TopologyFactory topologyFactory,
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal,
- Consumer<Map<Id, Node>>
readySignal, Function<Node.Id, Journal> journalFactory)
+ Consumer<Map<Id, Node>>
readySignal, BiFunction<Node.Id, Agent, Journal> journalFactory)
{
Topology topology = topologyFactory.toTopology(nodes);
Map<Id, Node> nodeMap = new LinkedHashMap<>();
@@ -666,7 +667,7 @@ public class Cluster
BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges)
-> configRandomizer.onStale(id, sinceAtLeast, ranges);
AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id,
onStale);
executorMap.put(id, nodeExecutor);
- Journal journal = journalFactory.apply(id);
+ Journal journal = journalFactory.apply(id,
nodeExecutor.agent());
journalMap.put(id, journal);
BurnTestConfigurationService configService = new
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology,
nodeMap::get, topologyUpdates);
DelayedCommandStores.CacheLoading cacheLoading = new
RandomLoader(random).newLoader(journal);
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 074e5246..59f16268 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -20,31 +20,30 @@ package accord.impl.basic;
import java.util.AbstractList;
import java.util.ArrayList;
+import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Objects;
import java.util.TreeMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSortedMap;
+import accord.api.Agent;
import accord.api.Journal;
import accord.api.Result;
+import accord.impl.CommandChange;
+import accord.impl.ErasedSafeCommand;
import accord.impl.InMemoryCommandStore;
import accord.local.Cleanup;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores;
import accord.local.Commands;
-import accord.local.CommonAttributes;
import accord.local.DurableBefore;
import accord.local.Node;
import accord.local.RedundantBefore;
import accord.local.StoreParticipants;
import accord.primitives.Ballot;
-import accord.primitives.Known;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Ranges;
@@ -54,10 +53,36 @@ import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.Invariants;
+import accord.utils.PersistentField;
import org.agrona.collections.Int2ObjectHashMap;
-import static accord.local.StoreParticipants.Filter.LOAD;
-import static accord.primitives.SaveStatus.NotDefined;
+import static accord.api.Journal.Load.ALL;
+import static accord.impl.CommandChange.*;
+import static accord.impl.CommandChange.Fields.ACCEPTED;
+import static accord.impl.CommandChange.Fields.DURABILITY;
+import static accord.impl.CommandChange.Fields.EXECUTES_AT_LEAST;
+import static accord.impl.CommandChange.Fields.EXECUTE_AT;
+import static accord.impl.CommandChange.Fields.PARTIAL_DEPS;
+import static accord.impl.CommandChange.Fields.PARTIAL_TXN;
+import static accord.impl.CommandChange.Fields.PARTICIPANTS;
+import static accord.impl.CommandChange.Fields.PROMISED;
+import static accord.impl.CommandChange.Fields.RESULT;
+import static accord.impl.CommandChange.Fields.SAVE_STATUS;
+import static accord.impl.CommandChange.Fields.WAITING_ON;
+import static accord.impl.CommandChange.Fields.WRITES;
+import static accord.impl.CommandChange.anyFieldChanged;
+import static accord.impl.CommandChange.getFieldChanged;
+import static accord.impl.CommandChange.getFieldIsNull;
+import static accord.impl.CommandChange.getFlags;
+import static accord.impl.CommandChange.getWaitingOn;
+import static accord.impl.CommandChange.nextSetField;
+import static accord.impl.CommandChange.setFieldChanged;
+import static accord.impl.CommandChange.setFieldIsNull;
+import static accord.impl.CommandChange.toIterableSetFields;
+import static accord.impl.CommandChange.unsetFieldIsNull;
+import static accord.impl.CommandChange.unsetIterableFields;
+import static accord.impl.CommandChange.validateFlags;
+import static accord.primitives.SaveStatus.ErasedOrVestigial;
import static accord.primitives.SaveStatus.Stable;
import static accord.primitives.Status.Invalidated;
import static accord.primitives.Status.Truncated;
@@ -69,16 +94,15 @@ public class InMemoryJournal implements Journal
private final Int2ObjectHashMap<FieldUpdates> fieldStates = new
Int2ObjectHashMap<>();
private final Node.Id id;
+ private final Agent agent;
- public InMemoryJournal(Node.Id id)
+ public InMemoryJournal(Node.Id id, Agent agent)
{
this.id = id;
+ this.agent = agent;
}
-
@Override
- public Command loadCommand(int commandStoreId, TxnId txnId,
- // TODO: currently unused!
- RedundantBefore redundantBefore, DurableBefore
durableBefore)
+ public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
NavigableMap<TxnId, List<Diff>> commandStore =
this.diffsPerCommandStore.get(commandStoreId);
@@ -89,17 +113,69 @@ public class InMemoryJournal implements Journal
if (saved == null)
return null;
- return reconstruct(redundantBefore, saved);
+ Builder builder = reconstruct(saved, ALL);
+ Cleanup cleanup = builder.shouldCleanup(agent, redundantBefore,
durableBefore);
+ switch (cleanup)
+ {
+ case EXPUNGE_PARTIAL:
+ case EXPUNGE:
+ case ERASE:
+ return ErasedSafeCommand.erased(txnId, ErasedOrVestigial);
+ }
+
+ return builder.construct(redundantBefore);
+ }
+
+ @Override
+ public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load
load, RedundantBefore redundantBefore, DurableBefore durableBefore)
+ {
+ Builder builder = reconstruct(commandStoreId, txnId, load);
+ if (builder == null || builder.isEmpty())
+ return null;
+
+ Cleanup cleanup = builder.shouldCleanup(agent, redundantBefore,
durableBefore);
+ switch (cleanup)
+ {
+ case EXPUNGE_PARTIAL:
+ case EXPUNGE:
+ case ERASE:
+ return null;
+ }
+
+ Invariants.checkState(builder.saveStatus() != null, "No saveSatus
loaded, but next was called and cleanup was not: %s", builder);
+ return builder.asMinimal();
+ }
+
+ private Builder reconstruct(int commandStoreId, TxnId txnId, Load load)
+ {
+ NavigableMap<TxnId, List<Diff>> commandStore =
this.diffsPerCommandStore.get(commandStoreId);
+
+ if (commandStore == null)
+ return null;
+
+ return
reconstruct(this.diffsPerCommandStore.get(commandStoreId).get(txnId), load);
+ }
+
+ private Builder reconstruct(List<Diff> saved, Load load)
+ {
+ if (saved == null)
+ return null;
+
+ Builder builder = null;
+ for (Diff diff : saved)
+ {
+ if (builder == null)
+ builder = new Builder(diff.txnId, load);
+ builder.apply(diff);
+ }
+ return builder;
}
@Override
public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
{
Diff diff;
- if (update == null
- || update.before == update.after
- || update.after.saveStatus() == SaveStatus.Uninitialised
- || (diff = diff(update.before, update.after)) == null)
+ if ((diff = toDiff(update)) == null)
{
if (onFlush!= null)
onFlush.run();
@@ -150,9 +226,15 @@ public class InMemoryJournal implements Journal
return fieldStates.newRangesForEpoch;
}
- public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush)
+ @Override
+ public PersistentField.Persister<DurableBefore, DurableBefore>
durableBeforePersister()
{
+ throw new UnsupportedOperationException("Not implemented");
+ }
+ @Override
+ public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush)
+ {
FieldUpdates fieldStates = this.fieldStates.computeIfAbsent(store, s
-> {
FieldUpdates init = new FieldUpdates();
init.newRedundantBefore = RedundantBefore.EMPTY;
@@ -160,6 +242,7 @@ public class InMemoryJournal implements Journal
init.newSafeToRead = ImmutableSortedMap.of(Timestamp.NONE,
Ranges.EMPTY);
return init;
});
+
if (fieldUpdates.newRedundantBefore != null)
fieldStates.newRedundantBefore = fieldUpdates.newRedundantBefore;
if (fieldUpdates.newSafeToRead != null)
@@ -180,18 +263,20 @@ public class InMemoryJournal implements Journal
{
int commandStoreId = e.getKey();
Map<TxnId, List<Diff>> localJournal = e.getValue();
- CommandStore commandStore = commandStores.forId(commandStoreId);
- if (commandStore == null)
+ CommandStore store = commandStores.forId(commandStoreId);
+ if (store == null)
continue;
for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet())
{
List<Diff> diffs = e2.getValue();
if (diffs.isEmpty()) continue;
- Command command =
reconstruct(commandStore.unsafeGetRedundantBefore(), diffs);
- if (command.status() == Truncated || command.status() ==
Invalidated)
+ InMemoryJournal.Builder builder = reconstruct(diffs, ALL);
+ if (builder.saveStatus().status == Truncated ||
builder.saveStatus().status == Invalidated)
continue; // Already truncated
- Cleanup cleanup = Cleanup.shouldCleanup(commandStore.agent(),
command, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
+
+ Command command =
builder.construct(store.unsafeGetRedundantBefore());
+ Cleanup cleanup = Cleanup.shouldCleanup(store.agent(),
command, store.unsafeGetRedundantBefore(), store.durableBefore());
switch (cleanup)
{
case NO:
@@ -200,9 +285,9 @@ public class InMemoryJournal implements Journal
case TRUNCATE_WITH_OUTCOME:
case TRUNCATE:
case ERASE:
- command = Commands.purgeUnsafe(commandStore, command,
cleanup);
+ command = Commands.purgeUnsafe(store, command,
cleanup);
Invariants.checkState(command.saveStatus() !=
SaveStatus.Uninitialised);
- Diff diff = diff(null, command);
+ Diff diff = toDiff(new CommandUpdate(null, command));
e2.setValue(cleanup == Cleanup.ERASE ? new
ErasedList(diff) : new TruncatedList(diff));
break;
@@ -225,6 +310,7 @@ public class InMemoryJournal implements Journal
for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry :
diffsPerCommandStore.entrySet())
{
int commandStoreId = diffEntry.getKey();
+
// copy to avoid concurrent modification when appending to journal
Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue());
@@ -237,7 +323,7 @@ public class InMemoryJournal implements Journal
for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
{
if (e.getValue().isEmpty()) continue;
- Command command =
reconstruct(commandStore.unsafeGetRedundantBefore(), e.getValue());
+ Command command = reconstruct(e.getValue(),
ALL).construct(commandStore.unsafeGetRedundantBefore());
Invariants.checkState(command.saveStatus() !=
SaveStatus.Uninitialised,
"Found uninitialized command in the log:
%s %s", diffEntry.getKey(), e.getValue());
loader.load(command, sync);
@@ -247,13 +333,13 @@ public class InMemoryJournal implements Journal
}
}
- static class ErasedList extends AbstractList<Diff>
+ private static class ErasedList extends AbstractList<Diff>
{
final Diff erased;
ErasedList(Diff erased)
{
- Invariants.checkArgument(erased.saveStatus.value ==
SaveStatus.Erased);
+ Invariants.checkArgument(erased.changes.get(SAVE_STATUS) ==
SaveStatus.Erased);
this.erased = erased;
}
@@ -274,7 +360,7 @@ public class InMemoryJournal implements Journal
@Override
public boolean add(Diff diff)
{
- if (diff.saveStatus != null && diff.saveStatus.value ==
SaveStatus.Erased)
+ if (diff.changes.get(SAVE_STATUS) == SaveStatus.Erased)
return false;
throw illegalState();
}
@@ -288,7 +374,7 @@ public class InMemoryJournal implements Journal
}
}
- static class PurgedList extends AbstractList<Diff>
+ private static class PurgedList extends AbstractList<Diff>
{
final List<Diff> purged;
PurgedList(List<Diff> purged)
@@ -311,370 +397,223 @@ public class InMemoryJournal implements Journal
@Override
public boolean add(Diff diff)
{
- if (diff.saveStatus != null && diff.saveStatus.value ==
SaveStatus.Erased)
+ Object saveStatus = diff.changes.get(SAVE_STATUS);
+ if (saveStatus == SaveStatus.Erased)
return false;
throw illegalState();
}
}
- private Command reconstruct(RedundantBefore redundantBefore, List<Diff>
diffs)
+ private static Diff toDiff(CommandUpdate update)
{
- Invariants.checkState(diffs != null && !diffs.isEmpty());
-
- TxnId txnId = null;
- Timestamp executeAt = null;
- Timestamp executesAtLeast = null;
- SaveStatus saveStatus = NotDefined;
- Status.Durability durability = Status.Durability.NotDurable;
-
- Ballot acceptedOrCommitted = Ballot.ZERO;
- Ballot promised = Ballot.ZERO;
-
- StoreParticipants participants = null;
- PartialTxn partialTxn = null;
- PartialDeps partialDeps = null;
-
- Command.WaitingOn waitingOn = null;
- Writes writes = null;
- Result result = null;
-
- for (int i = 0; i < diffs.size(); i++)
- {
- Diff diff = diffs.get(i);
- if (diff.txnId != null)
- txnId = diff.txnId;
- if (diff.executeAt != null)
- executeAt = diff.executeAt.get();
- if (diff.executesAtLeast != null)
- executesAtLeast = diff.executesAtLeast.get();
- if (diff.saveStatus != null)
- saveStatus = diff.saveStatus.get();
- if (diff.durability != null)
- durability = diff.durability.get();
-
- if (diff.acceptedOrCommitted != null)
- acceptedOrCommitted = diff.acceptedOrCommitted.get();
- if (diff.promised != null)
- promised = diff.promised.get();
-
- if (diff.participants != null)
- participants = diff.participants.get();
- if (diff.partialTxn != null)
- partialTxn = diff.partialTxn.get();
- if (diff.partialDeps != null)
- partialDeps = diff.partialDeps.get();
-
- if (diff.waitingOn != null)
- waitingOn = diff.waitingOn.get();
- if (diff.writes != null)
- writes = diff.writes.get();
- if (diff.result != null)
- result = diff.result.get();
-
- try
- {
- if (!txnId.kind().awaitsOnlyDeps())
- executesAtLeast = null;
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- }
-
- switch (saveStatus.known.outcome)
- {
- case Erased:
- case WasApply:
- writes = null;
- result = null;
- break;
- }
- }
-
- CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
- if (partialTxn != null)
- attrs.partialTxn(partialTxn);
- if (durability != null)
- attrs.durability(durability);
- if (participants != null)
attrs.setParticipants(participants.filter(LOAD, redundantBefore, txnId,
saveStatus.known.executeAt.isDecidedAndKnownToExecute() ? executeAt : null));
- else attrs.setParticipants(StoreParticipants.empty(txnId));
-
- // TODO (desired): we can simplify this logic if, instead of diffing,
we will infer the diff from the status
- if (partialDeps != null &&
- (saveStatus.known.deps != Known.KnownDeps.NoDeps &&
- saveStatus.known.deps != Known.KnownDeps.DepsErased &&
- saveStatus.known.deps != Known.KnownDeps.DepsUnknown))
- attrs.partialDeps(partialDeps);
-
- try
- {
-
- Command current;
- switch (saveStatus.status)
- {
- case NotDefined:
- current = saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
- :
Command.NotDefined.notDefined(attrs, promised);
- break;
- case PreAccepted:
- current = Command.PreAccepted.preAccepted(attrs,
executeAt, promised);
- break;
- case AcceptedInvalidate:
- case Accepted:
- case PreCommitted:
- if (saveStatus == SaveStatus.AcceptedInvalidate)
- current =
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised,
acceptedOrCommitted);
- else
- current = Command.Accepted.accepted(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted);
- break;
- case Committed:
- case Stable:
- current = Command.Committed.committed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn);
- break;
- case PreApplied:
- case Applied:
- current = Command.Executed.executed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
- break;
- case Invalidated:
- case Truncated:
- current = truncated(attrs, saveStatus, executeAt,
executesAtLeast, writes, result);
- break;
- default:
- throw new IllegalStateException("Do not know " +
saveStatus.status + " " + saveStatus);
- }
+ if (update == null
+ || update.before == update.after
+ || update.after == null
+ || update.after.saveStatus() == SaveStatus.Uninitialised)
+ return null;
- return current;
- }
- catch (Throwable t)
- {
- throw new RuntimeException("Can not reconstruct from diff:\n" +
diffs.stream().map(o -> o.toString())
-
.collect(Collectors.joining("\n")),
- t);
- }
- }
+ int flags = validateFlags(getFlags(update.before, update.after));
+ if (!anyFieldChanged(flags))
+ return null;
- private static Command.Truncated truncated(CommonAttributes.Mutable attrs,
SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes
writes, Result result)
- {
- switch (status)
- {
- default:
- throw illegalState("Unhandled SaveStatus: " + status);
- case TruncatedApplyWithOutcome:
- case TruncatedApplyWithDeps:
- case TruncatedApply:
- return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result, executesAtLeast);
- case ErasedOrVestigial:
- return Command.Truncated.erasedOrVestigial(attrs.txnId(),
attrs.participants());
- case Erased:
- // TODO (expected): why are we saving Durability here for
erased commands?
- return Command.Truncated.erased(attrs.txnId(),
attrs.durability(), attrs.participants());
- case Invalidated:
- return Command.Truncated.invalidated(attrs.txnId());
- }
+ return new Diff(flags, update);
}
private static class Diff
{
public final TxnId txnId;
+ public final Map<Fields, Object> changes;
+ public final int flags;
- public final NewValue<Timestamp> executeAt;
- public final NewValue<Timestamp> executesAtLeast;
- public final NewValue<SaveStatus> saveStatus;
- public final NewValue<Status.Durability> durability;
-
- public final NewValue<Ballot> acceptedOrCommitted;
- public final NewValue<Ballot> promised;
-
- public final NewValue<StoreParticipants> participants;
- public final NewValue<PartialTxn> partialTxn;
- public final NewValue<PartialDeps> partialDeps;
-
- public final NewValue<Writes> writes;
- public final NewValue<Command.WaitingOn> waitingOn;
-
- public final NewValue<Result> result; // temporarily here for sakes
for reloads
-
- public Diff(TxnId txnId,
- NewValue<Timestamp> executeAt,
- NewValue<Timestamp> executesAtLeast,
- NewValue<SaveStatus> saveStatus,
- NewValue<Status.Durability> durability,
-
- NewValue<Ballot> acceptedOrCommitted,
- NewValue<Ballot> promised,
-
- NewValue<StoreParticipants> participants,
- NewValue<PartialTxn> partialTxn,
- NewValue<PartialDeps> partialDeps,
- NewValue<Command.WaitingOn> waitingOn,
-
- NewValue<Writes> writes,
- NewValue<Result> result)
- {
- this.txnId = txnId;
- this.executeAt = executeAt;
- this.executesAtLeast = executesAtLeast;
- this.saveStatus = saveStatus;
- this.durability = durability;
-
- this.acceptedOrCommitted = acceptedOrCommitted;
- this.promised = promised;
-
- this.participants = participants;
- this.partialTxn = partialTxn;
- this.partialDeps = partialDeps;
-
- this.writes = writes;
- this.waitingOn = waitingOn;
- this.result = result;
- }
-
- public boolean allNulls()
+ private Diff(int flags, CommandUpdate update)
{
- if (txnId != null) return false;
- if (executeAt != null) return false;
- if (executesAtLeast != null) return false;
- if (saveStatus != null) return false;
- if (durability != null) return false;
- if (acceptedOrCommitted != null) return false;
- if (promised != null) return false;
- if (participants != null) return false;
- if (partialTxn != null) return false;
- if (partialDeps != null) return false;
- if (writes != null) return false;
- if (waitingOn != null) return false;
- if (result != null) return false;
- return true;
- }
-
- @Override
- public String toString()
- {
- StringBuilder builder = new StringBuilder("Diff{");
- if (txnId != null)
- builder.append("txnId = ").append(txnId).append(" ");
- if (executeAt != null)
- builder.append("executeAt = ").append(executeAt).append(" ");
- if (executesAtLeast != null)
- builder.append("executesAtLeast =
").append(executesAtLeast).append(" ");
- if (saveStatus != null)
- builder.append("saveStatus = ").append(saveStatus).append(" ");
- if (durability != null)
- builder.append("durability = ").append(durability).append(" ");
- if (acceptedOrCommitted != null)
- builder.append("acceptedOrCommitted =
").append(acceptedOrCommitted).append(" ");
- if (promised != null)
- builder.append("promised = ").append(promised).append(" ");
- if (participants != null)
- builder.append("participants =
").append(participants).append(" ");
- if (partialTxn != null)
- builder.append("partialTxn = ").append(partialTxn).append(" ");
- if (partialDeps != null)
- builder.append("partialDeps = ").append(partialDeps).append("
");
- if (writes != null)
- builder.append("writes = ").append(writes).append(" ");
- if (waitingOn != null)
- builder.append("waitingOn = ").append(waitingOn).append(" ");
- if (result != null)
- builder.append("result = ").append(result).append(" ");
- builder.append("}");
- return builder.toString();
- }
- }
-
- static Diff diff(Command before, Command after)
- {
- if (Objects.equals(before, after))
- return null;
-
- Diff diff = new Diff(after.txnId(),
- ifNotEqual(before, after, Command::executeAt,
true),
- ifNotEqual(before, after,
Command::executesAtLeast, true),
- ifNotEqual(before, after, Command::saveStatus,
false),
-
- ifNotEqual(before, after, Command::durability,
false),
- ifNotEqual(before, after,
Command::acceptedOrCommitted, false),
- ifNotEqual(before, after, Command::promised,
false),
-
- ifNotEqual(before, after, Command::participants,
true),
- ifNotEqual(before, after, Command::partialTxn,
false),
- ifNotEqual(before, after, Command::partialDeps,
false),
- ifNotEqual(before, after,
InMemoryJournal::getWaitingOn, true),
- ifNotEqual(before, after, Command::writes, false),
- ifNotEqual(before, after, Command::result,
false));
- if (diff.allNulls())
- return null;
-
- return diff;
- }
-
- static Command.WaitingOn getWaitingOn(Command command)
- {
- if (command instanceof Command.Committed)
- return command.asCommitted().waitingOn();
-
- return null;
- }
-
- private static <OBJ, VAL> NewValue<VAL> ifNotEqual(OBJ lo, OBJ ro,
Function<OBJ, VAL> convert, boolean allowClassMismatch)
- {
- VAL l = null;
- VAL r = null;
- if (lo != null) l = convert.apply(lo);
- if (ro != null) r = convert.apply(ro);
-
- if (l == r)
- return null; // null here means there was no change
-
- if (l == null || r == null)
- return NewValue.of(r);
+ this.flags = flags;
+ this.txnId = update.txnId;
+ this.changes = new EnumMap<>(Fields.class);
- assert allowClassMismatch || l.getClass() == r.getClass() :
String.format("%s != %s", l.getClass(), r.getClass());
+ Command after = update.after;
+ int iterable = toIterableSetFields(flags);
+ while (iterable != 0)
+ {
+ Fields field = nextSetField(iterable);
+ if (!getFieldChanged(field, flags) || getFieldIsNull(field,
flags))
+ {
+ iterable = unsetIterableFields(field, iterable);
+ continue;
+ }
- if (l.equals(r))
- return null;
+ switch (field)
+ {
+ case EXECUTE_AT:
+ changes.put(EXECUTE_AT, after.executeAt());
+ break;
+ case EXECUTES_AT_LEAST:
+ changes.put(EXECUTES_AT_LEAST,
after.executesAtLeast());
+ break;
+ case SAVE_STATUS:
+ changes.put(SAVE_STATUS, after.saveStatus());
+ break;
+ case DURABILITY:
+ changes.put(DURABILITY, after.durability());
+ break;
+ case ACCEPTED:
+ changes.put(ACCEPTED, after.acceptedOrCommitted());
+ break;
+ case PROMISED:
+ changes.put(PROMISED, after.promised());
+ break;
+ case PARTICIPANTS:
+ changes.put(PARTICIPANTS, after.participants());
+ break;
+ case PARTIAL_TXN:
+ changes.put(PARTIAL_TXN, after.partialTxn());
+ break;
+ case PARTIAL_DEPS:
+ changes.put(PARTIAL_DEPS, after.partialDeps());
+ break;
+ case WAITING_ON:
+ Command.WaitingOn waitingOn = getWaitingOn(after);
+ changes.put(WAITING_ON, (WaitingOnProvider) (txnId,
deps) -> waitingOn);
+ break;
+ case WRITES:
+ changes.put(WRITES, after.writes());
+ break;
+ case RESULT:
+ changes.put(RESULT, after.result());
+ break;
+ case CLEANUP:
+ }
- // TODO (expected): do we need to wrap this in a value object if we
also have flags?
- return NewValue.of(r);
+ iterable = unsetIterableFields(field, iterable);
+ }
+ }
}
- private static class NewValue<T>
+ private static class Builder extends CommandChange.Builder
{
- final T value;
-
- private NewValue(T value)
+ private Builder(TxnId txnId, Load load)
{
- this.value = value;
+ super(txnId, load);
}
- public T get()
+ private void apply(Diff diff)
{
- return value;
- }
+ Invariants.checkState(diff.txnId != null);
+ Invariants.checkState(diff.flags != 0);
+ nextCalled = true;
+ count++;
- public static <T> NewValue<T> of(T value)
- {
- return new NewValue<>(value);
- }
+ int iterable = toIterableSetFields(diff.flags);
+ while (iterable != 0)
+ {
+ Fields field = nextSetField(iterable);
+ if (getFieldChanged(field, diff.flags))
+ {
+ this.flags = setFieldChanged(field, this.flags);
+ if (getFieldIsNull(field, diff.flags))
+ {
+ this.flags = setFieldIsNull(field, this.flags);
+ setNull(field);
+ }
+ else
+ {
+ this.flags = unsetFieldIsNull(field, this.flags);
+ deserialize(diff, field);
+ }
+ }
- public String toString()
- {
- return "" + value;
+ iterable = unsetIterableFields(field, iterable);
+ }
}
- @Override
- public boolean equals(Object o)
+ private void setNull(Fields field)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- NewValue<?> newValue = (NewValue<?>) o;
- return Objects.equals(value, newValue.value);
+ switch (field)
+ {
+ case EXECUTE_AT:
+ executeAt = null;
+ break;
+ case EXECUTES_AT_LEAST:
+ executeAtLeast = null;
+ break;
+ case SAVE_STATUS:
+ saveStatus = null;
+ break;
+ case DURABILITY:
+ durability = null;
+ break;
+ case ACCEPTED:
+ acceptedOrCommitted = null;
+ break;
+ case PROMISED:
+ promised = null;
+ break;
+ case PARTICIPANTS:
+ participants = null;
+ break;
+ case PARTIAL_TXN:
+ partialTxn = null;
+ break;
+ case PARTIAL_DEPS:
+ partialDeps = null;
+ break;
+ case WAITING_ON:
+ waitingOn = null;
+ break;
+ case WRITES:
+ writes = null;
+ break;
+ case RESULT:
+ result = null;
+ break;
+ case CLEANUP:
+ throw new IllegalStateException();
+ }
}
- @Override
- public int hashCode()
+ private void deserialize(Diff diff, Fields field)
{
- throw new UnsupportedOperationException();
+ switch (field)
+ {
+ case EXECUTE_AT:
+ executeAt = Invariants.nonNull((Timestamp)
diff.changes.get(EXECUTE_AT));
+ break;
+ case EXECUTES_AT_LEAST:
+ executeAtLeast = Invariants.nonNull((Timestamp)
diff.changes.get(EXECUTES_AT_LEAST));
+ break;
+ case SAVE_STATUS:
+ saveStatus = Invariants.nonNull((SaveStatus)
diff.changes.get(SAVE_STATUS));
+ break;
+ case DURABILITY:
+ durability = Invariants.nonNull((Status.Durability)
diff.changes.get(DURABILITY));
+ break;
+ case ACCEPTED:
+ acceptedOrCommitted = Invariants.nonNull((Ballot)
diff.changes.get(ACCEPTED));
+ break;
+ case PROMISED:
+ promised = Invariants.nonNull((Ballot)
diff.changes.get(PROMISED));
+ break;
+ case PARTICIPANTS:
+ participants = Invariants.nonNull((StoreParticipants)
diff.changes.get(PARTICIPANTS));
+ break;
+ case PARTIAL_TXN:
+ partialTxn = Invariants.nonNull((PartialTxn)
diff.changes.get(PARTIAL_TXN));
+ break;
+ case PARTIAL_DEPS:
+ partialDeps = Invariants.nonNull((PartialDeps)
diff.changes.get(PARTIAL_DEPS));
+ break;
+ case WAITING_ON:
+ waitingOn = Invariants.nonNull((WaitingOnProvider)
diff.changes.get(WAITING_ON));
+ break;
+ case WRITES:
+ writes = Invariants.nonNull((Writes)
diff.changes.get(WRITES));
+ break;
+ case RESULT:
+ result = Invariants.nonNull((Result)
diff.changes.get(RESULT));
+ break;
+ case CLEANUP:
+ throw new IllegalStateException();
+ }
}
}
}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index d9cee7af..b40134a3 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -34,6 +34,7 @@ import accord.local.RedundantBefore;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.utils.PersistentField;
/**
* Logging journal, a wrapper over journal for debugging / inspecting history
purposes.
@@ -70,48 +71,68 @@ public class LoggingJournal implements Journal
}
}
+ @Override
public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
return delegate.loadCommand(commandStoreId, txnId, redundantBefore,
durableBefore);
}
+ @Override
+ public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load
load, RedundantBefore redundantBefore, DurableBefore durableBefore)
+ {
+ return delegate.loadMinimal(commandStoreId, txnId, load,
redundantBefore, durableBefore);
+ }
+
+ @Override
public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
{
log("%d: %s\n", store, update.after);
delegate.saveCommand(store, update, onFlush);
}
+ @Override
public void purge(CommandStores commandStores)
{
log("PURGE\n");
delegate.purge(commandStores);
}
+ @Override
public void replay(CommandStores commandStores)
{
delegate.replay(commandStores);
}
+ @Override
public RedundantBefore loadRedundantBefore(int commandStoreId)
{
return delegate.loadRedundantBefore(commandStoreId);
}
+ @Override
public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
{
return delegate.loadBootstrapBeganAt(commandStoreId);
}
+ @Override
public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
{
return delegate.loadSafeToRead(commandStoreId);
}
+ @Override
public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
{
return delegate.loadRangesForEpoch(commandStoreId);
}
+ @Override
+ public PersistentField.Persister<DurableBefore, DurableBefore>
durableBeforePersister()
+ {
+ return delegate.durableBeforePersister();
+ }
+
public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush)
{
log("%d: %s", store, fieldUpdates);
diff --git a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
deleted file mode 100644
index 5776c51f..00000000
--- a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.impl.basic;
-
-import java.util.NavigableMap;
-
-import accord.api.Journal;
-import accord.local.Command;
-import accord.local.CommandStores;
-import accord.local.DurableBefore;
-import accord.local.RedundantBefore;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.utils.Invariants;
-
-/**
- * A simple version of journal that can be useful for debugging issues with an
implementation that checks command loading
- * from SUT vs model.
- */
-public class VerifyingJournal implements Journal
-{
- private final Journal sut;
- private final Journal model;
-
- public VerifyingJournal(Journal model, Journal sut)
- {
- this.model = model;
- this.sut = sut;
- }
-
- public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
- {
- Command model = this.model.loadCommand(commandStoreId, txnId,
redundantBefore, durableBefore);
- Command sut = this.sut.loadCommand(commandStoreId, txnId,
redundantBefore, durableBefore);
- Invariants.checkState(sut.equals(model));
- return sut;
- }
-
- public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
- {
- model.saveCommand(store, update, null);
- sut.saveCommand(store, update, onFlush);
- }
-
- public void purge(CommandStores commandStores)
- {
- model.purge(commandStores);
- sut.purge(commandStores);
- }
-
- public void replay(CommandStores commandStores)
- {
- sut.replay(commandStores);
- }
-
- public RedundantBefore loadRedundantBefore(int commandStoreId)
- {
- RedundantBefore model = this.model.loadRedundantBefore(commandStoreId);
- RedundantBefore sut = this.sut.loadRedundantBefore(commandStoreId);
- Invariants.checkState(sut.equals(model), "%s should equal %s", sut,
model);
- return sut;
- }
-
- public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
- {
- NavigableMap<TxnId, Ranges> model =
this.sut.loadBootstrapBeganAt(commandStoreId);
- NavigableMap<TxnId, Ranges> sut =
this.sut.loadBootstrapBeganAt(commandStoreId);
- Invariants.checkState(sut.equals(model), "%s should equal %s", sut,
model);
- return sut;
- }
-
- public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
- {
- NavigableMap<Timestamp, Ranges> model =
this.model.loadSafeToRead(commandStoreId);
- NavigableMap<Timestamp, Ranges> sut =
this.sut.loadSafeToRead(commandStoreId);
- Invariants.checkState(sut.equals(model), "%s should equal %s", sut,
model);
- return sut;
- }
-
- public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
- {
- CommandStores.RangesForEpoch model =
this.sut.loadRangesForEpoch(commandStoreId);
- CommandStores.RangesForEpoch sut =
this.sut.loadRangesForEpoch(commandStoreId);
- Invariants.checkState(sut.equals(model), "%s should equal %s", sut,
model);
- return sut;
- }
-
- public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush)
- {
- model.saveStoreState(store, fieldUpdates, onFlush);
- sut.saveStoreState(store, fieldUpdates, onFlush);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]