This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 8aeab43d47 Migrate in memory journal to CommandChange logic shared
with AccordJournal
8aeab43d47 is described below
commit 8aeab43d47274b9ff6b00eaf24ea587f92449b54
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Nov 26 15:26:54 2024 +0100
Migrate in memory journal to CommandChange logic shared with AccordJournal
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20115
---
modules/accord | 2 +-
.../db/compaction/CompactionIterator.java | 6 +-
.../cassandra/service/accord/AccordCache.java | 30 +-
.../cassandra/service/accord/AccordCacheEntry.java | 8 +-
.../service/accord/AccordCommandStore.java | 14 +-
.../cassandra/service/accord/AccordJournal.java | 400 +++++++--
.../accord/AccordJournalValueSerializers.java | 24 +-
.../cassandra/service/accord/AccordTask.java | 2 +-
.../service/accord/CommandsForRanges.java | 4 +-
.../apache/cassandra/service/accord/IJournal.java | 40 -
.../cassandra/service/accord/SavedCommand.java | 980 ---------------------
.../distributed/test/accord/AccordLoadTest.java | 4 -
.../service/accord/AccordJournalBurnTest.java | 4 +-
.../service/accord/AccordJournalOrderTest.java | 2 +-
...avedCommandTest.java => CommandChangeTest.java} | 25 +-
.../accord/SimulatedAccordCommandStore.java | 21 +-
16 files changed, 396 insertions(+), 1170 deletions(-)
diff --git a/modules/accord b/modules/accord
index 520cc1072d..f7b9bb8887 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 520cc1072d44a5f7617566b6667e915532b89033
+Subproject commit f7b9bb8887ed672185f269ebcbc9d11e6aeafca9
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index e11d154829..90b583235c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -92,6 +92,7 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordJournal;
import org.apache.cassandra.service.accord.AccordJournalValueSerializers;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
import org.apache.cassandra.service.accord.AccordKeyspace;
@@ -101,7 +102,6 @@ import
org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.JournalKey;
-import org.apache.cassandra.service.accord.SavedCommand;
import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.paxos.PaxosRepairHistory;
@@ -1016,7 +1016,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
return newVersion.build().unfilteredIterator();
}
- SavedCommand.Builder commandBuilder = (SavedCommand.Builder)
builder;
+ AccordJournal.Builder commandBuilder = (AccordJournal.Builder)
builder;
if (commandBuilder.isEmpty())
{
Invariants.checkState(rows.isEmpty());
@@ -1038,7 +1038,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
PartitionUpdate.SimpleBuilder newVersion =
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
Row.SimpleBuilder rowBuilder =
newVersion.row(firstClustering);
- rowBuilder.add("record",
commandBuilder.asByteBuffer(userVersion))
+ rowBuilder.add("record",
commandBuilder.asByteBuffer(redundantBefore, userVersion))
.add("user_version", userVersion);
return newVersion.build().unfilteredIterator();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index 180b827531..cbdba5224f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -104,7 +104,7 @@ public class AccordCache implements CacheSize
@Nullable V quickShrink(V value);
// a result of null means we cannot shrink, and should save/evict as
appropriate
@Nullable Object fullShrink(K key, V value);
- @Nullable V inflate(K key, Object shrunk);
+ @Nullable V inflate(AccordCommandStore commandStore, K key, Object
shrunk);
long estimateHeapSize(V value);
long estimateShrunkHeapSize(Object shrunk);
boolean validate(AccordCommandStore commandStore, K key, V value);
@@ -359,7 +359,7 @@ public class AccordCache implements CacheSize
ToLongFunction<V> heapEstimator,
Function<AccordCacheEntry<K, V>, S> safeRefFactory)
{
- return newType(keyClass, loadFunction, saveFunction, quickShrink, (i,
j) -> j, (i, j) -> (V)j, validateFunction, heapEstimator, i -> 0,
safeRefFactory);
+ return newType(keyClass, loadFunction, saveFunction, quickShrink, (i,
j) -> j, (c, i, j) -> (V)j, validateFunction, heapEstimator, i -> 0,
safeRefFactory);
}
public <K, V, S extends AccordSafeState<K, V>> Type<K, V, S> newType(
@@ -368,7 +368,7 @@ public class AccordCache implements CacheSize
QuadFunction<AccordCommandStore, K, V, Object, Runnable> saveFunction,
Function<V, V> quickShrink,
BiFunction<K, V, Object> fullShrink,
- BiFunction<K, Object, V> inflate,
+ TriFunction<AccordCommandStore, K, Object, V> inflate,
TriFunction<AccordCommandStore, K, V, Boolean> validateFunction,
ToLongFunction<V> heapEstimator,
ToLongFunction<Object> shrunkHeapEstimator,
@@ -583,7 +583,7 @@ public class AccordCache implements CacheSize
{
Object shrunk = state.tryGetShrunk();
if (shrunk != null)
- evicted = adapter.inflate(key, shrunk);
+ evicted = adapter.inflate(commandStore, key,
shrunk);
}
catch (RuntimeException rte)
{
@@ -971,7 +971,7 @@ public class AccordCache implements CacheSize
final QuadFunction<AccordCommandStore, K, V, Object, Runnable> save;
final Function<V, V> quickShrink;
final BiFunction<K, V, Object> shrink;
- final BiFunction<K, Object, V> inflate;
+ final TriFunction<AccordCommandStore, K, Object, V> inflate;
final TriFunction<AccordCommandStore, K, V, Boolean> validate;
final ToLongFunction<V> estimateHeapSize;
final ToLongFunction<Object> estimateShrunkHeapSize;
@@ -981,7 +981,7 @@ public class AccordCache implements CacheSize
FunctionalAdapter(BiFunction<AccordCommandStore, K, V> load,
QuadFunction<AccordCommandStore, K, V, Object,
Runnable> save,
Function<V, V> quickShrink, BiFunction<K, V, Object>
shrink,
- BiFunction<K, Object, V> inflate,
+ TriFunction<AccordCommandStore, K, Object, V>
inflate,
TriFunction<AccordCommandStore, K, V, Boolean>
validate,
ToLongFunction<V> estimateHeapSize,
ToLongFunction<Object> estimateShrunkHeapSize,
@@ -1030,9 +1030,9 @@ public class AccordCache implements CacheSize
}
@Override
- public V inflate(K key, Object shrunk)
+ public V inflate(AccordCommandStore commandStore, K key, Object shrunk)
{
- return inflate.apply(key, shrunk);
+ return inflate.apply(commandStore, key, shrunk);
}
@Override
@@ -1096,7 +1096,7 @@ public class AccordCache implements CacheSize
@Override public Runnable save(AccordCommandStore commandStore, K key,
@Nullable V value, @Nullable Object shrunk) { return null; }
@Override public V quickShrink(V value) { return null; }
@Override public Object fullShrink(K key, V value) { return null; }
- @Override public V inflate(K key, Object shrunk) { return null; }
+ @Override public V inflate(AccordCommandStore commandStore, K key,
Object shrunk) { return null; }
@Override public long estimateHeapSize(V value) { return 0; }
@Override public long estimateShrunkHeapSize(Object shrunk) { return
0; }
@Override public boolean validate(AccordCommandStore commandStore, K
key, V value) { return false; }
@@ -1136,7 +1136,7 @@ public class AccordCache implements CacheSize
}
@Override
- public CommandsForKey inflate(RoutingKey key, Object shrunk)
+ public CommandsForKey inflate(AccordCommandStore commandStore,
RoutingKey key, Object shrunk)
{
return Serialize.fromBytes(key, (ByteBuffer)shrunk);
}
@@ -1186,7 +1186,7 @@ public class AccordCache implements CacheSize
if (value == null)
{
- value = inflate(txnId, serialized);
+ value = inflate(commandStore, txnId, serialized);
if (value == null)
return null;
}
@@ -1212,7 +1212,7 @@ public class AccordCache implements CacheSize
try
{
- return SavedCommand.asSerializedDiff(null, value,
current_version);
+ return AccordJournal.asSerializedChange(null, value,
current_version);
}
catch (IOException e)
{
@@ -1222,15 +1222,15 @@ public class AccordCache implements CacheSize
}
@Override
- public @Nullable Command inflate(TxnId key, Object serialized)
+ public @Nullable Command inflate(AccordCommandStore commandStore,
TxnId key, Object serialized)
{
- SavedCommand.Builder builder = new SavedCommand.Builder(key);
+ AccordJournal.Builder builder = new AccordJournal.Builder(key);
ByteBuffer buffer = (ByteBuffer) serialized;
buffer.mark();
try (DataInputBuffer buf = new DataInputBuffer(buffer, false))
{
builder.deserializeNext(buf, current_version);
- return builder.construct();
+ return
builder.construct(commandStore.unsafeGetRedundantBefore());
}
catch (UnknownTableException e)
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
index 4bfa4be308..4e152e1650 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
@@ -368,7 +368,7 @@ public class AccordCacheEntry<K, V> extends
IntrusiveLinkedListNode
if (isShrunk())
{
AccordCache.Type<K, V, ?> parent = owner.parent();
- inflate(key, parent.adapter());
+ inflate(owner.commandStore, key, parent.adapter());
updateSize(parent);
}
@@ -538,17 +538,17 @@ public class AccordCacheEntry<K, V> extends
IntrusiveLinkedListNode
return true;
}
- private void inflate(K key, Adapter<K, V, ?> adapter)
+ private void inflate(AccordCommandStore commandStore, K key, Adapter<K, V,
?> adapter)
{
Invariants.checkState(isShrunk());
if (isNested())
{
Nested nested = (Nested) state;
- nested.state = adapter.inflate(key, nested.state);
+ nested.state = adapter.inflate(commandStore, key, nested.state);
}
else
{
- state = adapter.inflate(key, state);
+ state = adapter.inflate(commandStore, key, state);
}
status &= ~SHRUNK;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 35e8dca12a..81ce3ac8a4 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
import accord.api.Agent;
import accord.api.DataStore;
+import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
@@ -63,7 +64,6 @@ import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.service.accord.SavedCommand.MinimalCommand;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.utils.Clock;
@@ -71,12 +71,12 @@ import
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static accord.api.Journal.CommandUpdate;
import static accord.api.Journal.FieldUpdates;
+import static accord.api.Journal.Load.MINIMAL;
import static accord.api.Journal.Loader;
import static accord.api.Journal.OnDone;
import static accord.local.KeyHistory.SYNC;
import static accord.primitives.Status.Committed;
import static accord.utils.Invariants.checkState;
-import static org.apache.cassandra.service.accord.SavedCommand.Load.MINIMAL;
public class AccordCommandStore extends CommandStore
{
@@ -141,7 +141,7 @@ public class AccordCommandStore extends CommandStore
}
public final String loggingId;
- private final IJournal journal;
+ private final Journal journal;
private final AccordExecutor executor;
private final Executor taskExecutor;
private final ExclusiveCaches caches;
@@ -160,7 +160,7 @@ public class AccordCommandStore extends CommandStore
ProgressLog.Factory progressLogFactory,
LocalListeners.Factory listenerFactory,
EpochUpdateHolder epochUpdateHolder,
- IJournal journal,
+ Journal journal,
AccordExecutor executor)
{
super(id, node, agent, dataStore, progressLogFactory, listenerFactory,
epochUpdateHolder);
@@ -300,9 +300,9 @@ public class AccordCommandStore extends CommandStore
}
@VisibleForTesting
- public void sanityCheckCommand(Command command)
+ public void sanityCheckCommand(RedundantBefore redundantBefore, Command
command)
{
- ((AccordJournal) journal).sanityCheck(id, command);
+ ((AccordJournal) journal).sanityCheck(id, redundantBefore, command);
}
CommandsForKey loadCommandsForKey(RoutableKey key)
@@ -487,7 +487,7 @@ public class AccordCommandStore extends CommandStore
return command;
}
- public MinimalCommand loadMinimal(TxnId txnId)
+ public Command.Minimal loadMinimal(TxnId txnId)
{
return journal.loadMinimal(id, txnId, MINIMAL,
unsafeGetRedundantBefore(), durableBefore());
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 6e2e57fc40..a17c3528f5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -18,19 +18,20 @@
package org.apache.cassandra.service.accord;
import java.io.IOException;
-import java.util.ArrayList;
+import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.impl.CommandChange;
import accord.impl.ErasedSafeCommand;
import accord.local.Cleanup;
import accord.local.Command;
@@ -54,6 +55,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.journal.Compactor;
@@ -65,14 +67,30 @@ import org.apache.cassandra.net.MessagingService;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
import org.apache.cassandra.service.accord.api.AccordAgent;
+import org.apache.cassandra.service.accord.serializers.CommandSerializers;
+import org.apache.cassandra.service.accord.serializers.DepsSerializers;
+import org.apache.cassandra.service.accord.serializers.ResultSerializers;
+import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import static accord.impl.CommandChange.anyFieldChanged;
+import static accord.impl.CommandChange.getFieldChanged;
+import static accord.impl.CommandChange.getFieldIsNull;
+import static accord.impl.CommandChange.getFlags;
+import static accord.impl.CommandChange.getWaitingOn;
+import static accord.impl.CommandChange.nextSetField;
+import static accord.impl.CommandChange.setFieldChanged;
+import static accord.impl.CommandChange.setFieldIsNull;
+import static accord.impl.CommandChange.toIterableSetFields;
+import static accord.impl.CommandChange.unsetIterableFields;
+import static accord.impl.CommandChange.validateFlags;
import static accord.primitives.SaveStatus.ErasedOrVestigial;
import static accord.primitives.Status.Truncated;
import static
org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator;
-public class AccordJournal implements IJournal, Shutdownable
+public class AccordJournal implements accord.api.Journal, Shutdownable
{
static
{
@@ -188,7 +206,7 @@ public class AccordJournal implements IJournal, Shutdownable
@Override
public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
- SavedCommand.Builder builder = loadDiffs(commandStoreId, txnId);
+ Builder builder = load(commandStoreId, txnId);
Cleanup cleanup = builder.shouldCleanup(agent, redundantBefore,
durableBefore);
switch (cleanup)
{
@@ -197,14 +215,14 @@ public class AccordJournal implements IJournal,
Shutdownable
case ERASE:
return ErasedSafeCommand.erased(txnId, ErasedOrVestigial);
}
- return builder.construct();
+ return builder.construct(redundantBefore);
}
@Override
- public SavedCommand.MinimalCommand loadMinimal(int commandStoreId, TxnId
txnId, SavedCommand.Load load, RedundantBefore redundantBefore, DurableBefore
durableBefore)
+ public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load
load, RedundantBefore redundantBefore, DurableBefore durableBefore)
{
- SavedCommand.Builder builder = loadDiffs(commandStoreId, txnId, load);
- if (!builder.nextCalled)
+ Builder builder = loadDiffs(commandStoreId, txnId, load);
+ if (builder.isEmpty())
return null;
Cleanup cleanup = builder.shouldCleanup(node.agent(), redundantBefore,
durableBefore);
@@ -215,11 +233,11 @@ public class AccordJournal implements IJournal,
Shutdownable
case ERASE:
return null;
}
- Invariants.checkState(builder.saveStatus != null, "No saveSatus
loaded, but next was called and cleanup was not: %s", builder);
+ Invariants.checkState(builder.saveStatus() != null, "No saveSatus
loaded, but next was called and cleanup was not: %s", builder);
return builder.asMinimal();
}
- @VisibleForTesting
+ @Override
public RedundantBefore loadRedundantBefore(int store)
{
IdentityAccumulator<RedundantBefore> accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, store));
@@ -250,8 +268,8 @@ public class AccordJournal implements IJournal, Shutdownable
@Override
public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
{
- SavedCommand.Writer diff = SavedCommand.diff(update.before,
update.after);
- if (diff == null || status == Status.REPLAY)
+ Writer diff = Writer.make(update.before, update.after);
+ if (diff == null)
{
if (onFlush != null)
onFlush.run();
@@ -272,9 +290,6 @@ public class AccordJournal implements IJournal, Shutdownable
@Override
public AsyncResult<?> persist(DurableBefore addDurableBefore,
DurableBefore newDurableBefore)
{
- if (status == Status.REPLAY)
- return AsyncResults.success(null);
-
AsyncResult.Settable<Void> result = AsyncResults.settable();
JournalKey key = new JournalKey(TxnId.NONE,
JournalKey.Type.DURABLE_BEFORE, 0);
RecordPointer pointer = appendInternal(key, addDurableBefore);
@@ -315,18 +330,18 @@ public class AccordJournal implements IJournal,
Shutdownable
onFlush.run();
}
- @VisibleForTesting
- public SavedCommand.Builder loadDiffs(int commandStoreId, TxnId txnId,
SavedCommand.Load load)
+ private Builder loadDiffs(int commandStoreId, TxnId txnId, Load load)
{
JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF,
commandStoreId);
- SavedCommand.Builder builder = new SavedCommand.Builder(txnId, load);
+ Builder builder = new Builder(txnId, load);
journalTable.readAll(key, builder::deserializeNext);
return builder;
}
- public SavedCommand.Builder loadDiffs(int commandStoreId, TxnId txnId)
+ @VisibleForTesting
+ public Builder load(int commandStoreId, TxnId txnId)
{
- return loadDiffs(commandStoreId, txnId, SavedCommand.Load.ALL);
+ return loadDiffs(commandStoreId, txnId, Load.ALL);
}
private <BUILDER> BUILDER readAll(JournalKey key)
@@ -351,17 +366,17 @@ public class AccordJournal implements IJournal,
Shutdownable
journal.closeCurrentSegmentForTestingIfNonEmpty();
}
- public void sanityCheck(int commandStoreId, Command orig)
+ public void sanityCheck(int commandStoreId, RedundantBefore
redundantBefore, Command orig)
{
- SavedCommand.Builder diffs = loadDiffs(commandStoreId, orig.txnId());
- diffs.forceResult(orig.result());
+ Builder builder = load(commandStoreId, orig.txnId());
+ builder.forceResult(orig.result());
// We can only use strict equality if we supply result.
- Command reconstructed = diffs.construct();
+ Command reconstructed = builder.construct(redundantBefore);
Invariants.checkState(orig.equals(reconstructed),
'\n' +
"Original: %s\n" +
"Reconstructed: %s\n" +
- "Diffs: %s", orig, reconstructed, diffs);
+ "Diffs: %s", orig, reconstructed,
builder);
}
@VisibleForTesting
@@ -391,7 +406,7 @@ public class AccordJournal implements IJournal, Shutdownable
try (AccordJournalTable.KeyOrderIterator<JournalKey> iter =
journalTable.readAll())
{
JournalKey key;
- SavedCommand.Builder builder = new SavedCommand.Builder();
+ Builder builder = new Builder();
while ((key = iter.key()) != null)
{
@@ -417,12 +432,12 @@ public class AccordJournal implements IJournal,
Shutdownable
}
});
- if (builder.nextCalled)
+ if (!builder.isEmpty())
{
- Command command = builder.construct();
+ CommandStore commandStore =
commandStores.forId(key.commandStoreId);
+ Command command =
builder.construct(commandStore.unsafeGetRedundantBefore());
Invariants.checkState(command.saveStatus() !=
SaveStatus.Uninitialised,
"Found uninitialized command in the
log: %s %s", command.toString(), builder.toString());
- CommandStore commandStore =
commandStores.forId(key.commandStoreId);
Loader loader = commandStore.loader();
async(loader::load, command).get();
if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0
&& !command.hasBeen(Truncated))
@@ -454,65 +469,306 @@ public class AccordJournal implements IJournal,
Shutdownable
return future;
}
- // TODO: this is here temporarily; for debugging purposes
+ public static @Nullable ByteBuffer asSerializedChange(Command before,
Command after, int userVersion) throws IOException
+ {
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ Writer writer = Writer.make(before, after);
+ if (writer == null)
+ return null;
+
+ writer.write(out, userVersion);
+ return out.asNewBuffer();
+ }
+ }
+
@VisibleForTesting
- public void checkAllCommands()
+ public void unsafeSetStarted()
{
- try (AccordJournalTable.KeyOrderIterator<JournalKey> iter =
journalTable.readAll())
+ status = Status.STARTED;
+ }
+
+ public static class Writer implements Journal.Writer
+ {
+ private final Command after;
+ private final int flags;
+
+ private Writer(Command after, int flags)
{
- IAccordService.CompactionInfo compactionInfo =
AccordService.instance().getCompactionInfo();
- JournalKey key;
- SavedCommand.Builder builder = new SavedCommand.Builder();
- while ((key = iter.key()) != null)
+ this.after = after;
+ this.flags = flags;
+ }
+
+ public static Writer make(Command before, Command after)
+ {
+ if (before == after
+ || after == null
+ || after.saveStatus() == SaveStatus.Uninitialised)
+ return null;
+
+ int flags = validateFlags(getFlags(before, after));
+ if (!anyFieldChanged(flags))
+ return null;
+
+ return new Writer(after, flags);
+ }
+
+ @Override
+ public void write(DataOutputPlus out, int userVersion) throws
IOException
+ {
+ serialize(after, flags, out, userVersion);
+ }
+
+ private static void serialize(Command command, int flags,
DataOutputPlus out, int userVersion) throws IOException
+ {
+ Invariants.checkState(flags != 0);
+ out.writeInt(flags);
+
+ int iterable = toIterableSetFields(flags);
+ while (iterable != 0)
{
- builder.reset(key.id);
- if (key.type != JournalKey.Type.COMMAND_DIFF)
+ CommandChange.Fields field = nextSetField(iterable);
+ if (getFieldIsNull(field, flags))
{
- // TODO (required): add "skip" for the key to avoid
getting stuck
- iter.readAllForKey(key, (segment, position, key1, buffer,
hosts, userVersion) -> {});
+ iterable = unsetIterableFields(field, iterable);
continue;
}
- JournalKey finalKey = key;
- List<RecordPointer> pointers = new ArrayList<>();
- try
+ switch (field)
+ {
+ case EXECUTE_AT:
+
CommandSerializers.timestamp.serialize(command.executeAt(), out, userVersion);
+ break;
+ case EXECUTES_AT_LEAST:
+
CommandSerializers.timestamp.serialize(command.executesAtLeast(), out,
userVersion);
+ break;
+ case SAVE_STATUS:
+ out.writeShort(command.saveStatus().ordinal());
+ break;
+ case DURABILITY:
+ out.writeByte(command.durability().ordinal());
+ break;
+ case ACCEPTED:
+
CommandSerializers.ballot.serialize(command.acceptedOrCommitted(), out,
userVersion);
+ break;
+ case PROMISED:
+
CommandSerializers.ballot.serialize(command.promised(), out, userVersion);
+ break;
+ case PARTICIPANTS:
+
CommandSerializers.participants.serialize(command.participants(), out,
userVersion);
+ break;
+ case PARTIAL_TXN:
+
CommandSerializers.partialTxn.serialize(command.partialTxn(), out, userVersion);
+ break;
+ case PARTIAL_DEPS:
+
DepsSerializers.partialDeps.serialize(command.partialDeps(), out, userVersion);
+ break;
+ case WAITING_ON:
+ Command.WaitingOn waitingOn = getWaitingOn(command);
+ long size =
WaitingOnSerializer.serializedSize(command.txnId(), waitingOn);
+ ByteBuffer serialized =
WaitingOnSerializer.serialize(command.txnId(), waitingOn);
+ Invariants.checkState(serialized.remaining() == size);
+ out.writeInt((int) size);
+ out.write(serialized);
+ break;
+ case WRITES:
+ CommandSerializers.writes.serialize(command.writes(),
out, userVersion);
+ break;
+ case RESULT:
+ ResultSerializers.result.serialize(command.result(),
out, userVersion);
+ break;
+ case CLEANUP:
+ throw new IllegalStateException();
+ }
+
+ iterable = unsetIterableFields(field, iterable);
+ }
+ }
+ }
+
+ public static class Builder extends CommandChange.Builder
+ {
+ public Builder()
+ {
+ super(null, Load.ALL);
+ }
+
+ public Builder(TxnId txnId)
+ {
+ super(txnId, Load.ALL);
+ }
+
+ public Builder(TxnId txnId, Load load)
+ {
+ super(txnId, load);
+ }
+ public ByteBuffer asByteBuffer(RedundantBefore redundantBefore, int
userVersion) throws IOException
+ {
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ serialize(out, redundantBefore, userVersion);
+ return out.asNewBuffer();
+ }
+ }
+
+ public Builder maybeCleanup(Cleanup cleanup)
+ {
+ super.maybeCleanup(cleanup);
+ return this;
+ }
+
+ public void serialize(DataOutputPlus out, RedundantBefore
redundantBefore, int userVersion) throws IOException
+ {
+ Invariants.checkState(mask == 0);
+ Invariants.checkState(flags != 0);
+
+ int flags = validateFlags(this.flags);
+ Writer.serialize(construct(redundantBefore), flags, out,
userVersion);
+ }
+
+ public void deserializeNext(DataInputPlus in, int userVersion) throws
IOException
+ {
+ Invariants.checkState(txnId != null);
+ int flags = in.readInt();
+ Invariants.checkState(flags != 0);
+ nextCalled = true;
+ count++;
+
+ int iterable = toIterableSetFields(flags);
+ while (iterable != 0)
+ {
+ CommandChange.Fields field = nextSetField(iterable);
+ if (getFieldChanged(field, this.flags) ||
getFieldIsNull(field, mask))
+ {
+ if (!getFieldIsNull(field, flags))
+ skip(field, in, userVersion);
+
+ iterable = unsetIterableFields(field, iterable);
+ continue;
+ }
+ this.flags = setFieldChanged(field, this.flags);
+
+ if (getFieldIsNull(field, flags))
+ {
+ this.flags = setFieldIsNull(field, this.flags);
+ }
+ else
{
- iter.readAllForKey(key, (segment, position, local, buffer,
hosts, userVersion) -> {
- pointers.add(new RecordPointer(segment, position));
- Invariants.checkState(finalKey.equals(local));
- try (DataInputBuffer in = new DataInputBuffer(buffer,
false))
+ deserialize(field, in, userVersion);
+ }
+
+ iterable = unsetIterableFields(field, iterable);
+ }
+ }
+
+ private void deserialize(CommandChange.Fields field, DataInputPlus in,
int userVersion) throws IOException
+ {
+ switch (field)
+ {
+ case EXECUTE_AT:
+ executeAt = CommandSerializers.timestamp.deserialize(in,
userVersion);
+ break;
+ case EXECUTES_AT_LEAST:
+ executeAtLeast =
CommandSerializers.timestamp.deserialize(in, userVersion);
+ break;
+ case SAVE_STATUS:
+ saveStatus = SaveStatus.values()[in.readShort()];
+ break;
+ case DURABILITY:
+ durability =
accord.primitives.Status.Durability.values()[in.readByte()];
+ break;
+ case ACCEPTED:
+ acceptedOrCommitted =
CommandSerializers.ballot.deserialize(in, userVersion);
+ break;
+ case PROMISED:
+ promised = CommandSerializers.ballot.deserialize(in,
userVersion);
+ break;
+ case PARTICIPANTS:
+ participants =
CommandSerializers.participants.deserialize(in, userVersion);
+ break;
+ case PARTIAL_TXN:
+ partialTxn = CommandSerializers.partialTxn.deserialize(in,
userVersion);
+ break;
+ case PARTIAL_DEPS:
+ partialDeps = DepsSerializers.partialDeps.deserialize(in,
userVersion);
+ break;
+ case WAITING_ON:
+ int size = in.readInt();
+
+ byte[] waitingOnBytes = new byte[size];
+ in.readFully(waitingOnBytes);
+ ByteBuffer buffer = ByteBuffer.wrap(waitingOnBytes);
+ waitingOn = (localTxnId, deps) -> {
+ try
{
- builder.deserializeNext(in, userVersion);
+ Invariants.nonNull(deps);
+ return WaitingOnSerializer.deserialize(localTxnId,
deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer);
}
catch (IOException e)
{
- // can only throw if serializer is buggy
- throw new RuntimeException(e);
+ throw Throwables.unchecked(e);
}
- });
-
- Cleanup cleanup = builder.shouldCleanup(node.agent(),
compactionInfo.redundantBefores.get(key.commandStoreId),
compactionInfo.durableBefores.get(key.commandStoreId));
- switch (cleanup)
- {
- case ERASE:
- case EXPUNGE:
- case EXPUNGE_PARTIAL:
- case VESTIGIAL:
- continue;
- }
- builder.construct();
- }
- catch (Throwable t)
- {
- throw new RuntimeException(String.format("Caught an
exception after iterating over: %s", pointers),
- t);
- }
+ };
+ break;
+ case WRITES:
+ writes = CommandSerializers.writes.deserialize(in,
userVersion);
+ break;
+ case CLEANUP:
+ Cleanup newCleanup = Cleanup.forOrdinal(in.readByte());
+ if (cleanup == null || newCleanup.compareTo(cleanup) > 0)
+ cleanup = newCleanup;
+ break;
+ case RESULT:
+ result = ResultSerializers.result.deserialize(in,
userVersion);
+ break;
}
}
- }
- public void unsafeSetStarted()
- {
- status = Status.STARTED;
+ private void skip(CommandChange.Fields field, DataInputPlus in, int
userVersion) throws IOException
+ {
+ switch (field)
+ {
+ case EXECUTE_AT:
+ case EXECUTES_AT_LEAST:
+ CommandSerializers.timestamp.skip(in, userVersion);
+ break;
+ case SAVE_STATUS:
+ in.readShort();
+ break;
+ case DURABILITY:
+ in.readByte();
+ break;
+ case ACCEPTED:
+ case PROMISED:
+ CommandSerializers.ballot.skip(in, userVersion);
+ break;
+ case PARTICIPANTS:
+ // TODO (expected): skip
+ CommandSerializers.participants.deserialize(in,
userVersion);
+ break;
+ case PARTIAL_TXN:
+ CommandSerializers.partialTxn.deserialize(in, userVersion);
+ break;
+ case PARTIAL_DEPS:
+ // TODO (expected): skip
+ DepsSerializers.partialDeps.deserialize(in, userVersion);
+ break;
+ case WAITING_ON:
+ int size = in.readInt();
+ in.skipBytesFully(size);
+ break;
+ case WRITES:
+ // TODO (expected): skip
+ CommandSerializers.writes.deserialize(in, userVersion);
+ break;
+ case CLEANUP:
+ in.readByte();
+ break;
+ case RESULT:
+ // TODO (expected): skip
+ result = ResultSerializers.result.deserialize(in,
userVersion);
+ break;
+ }
+ }
}
}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
index 7f1c71f351..a11dfb744b 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
@@ -35,8 +35,8 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.accord.serializers.CommandStoreSerializers;
import org.apache.cassandra.service.accord.serializers.KeySerializers;
+import static accord.api.Journal.Load.ALL;
import static accord.local.CommandStores.RangesForEpoch;
-import static org.apache.cassandra.service.accord.SavedCommand.Load.ALL;
// TODO (required): test with large collection values, and perhaps split out
some fields if they have a tendency to grow larger
// TODO (required): alert on metadata size
@@ -56,16 +56,16 @@ public class AccordJournalValueSerializers
}
public static class CommandDiffSerializer
- implements FlyweightSerializer<SavedCommand.Writer, SavedCommand.Builder>
+ implements FlyweightSerializer<AccordJournal.Writer, AccordJournal.Builder>
{
@Override
- public SavedCommand.Builder mergerFor(JournalKey journalKey)
+ public AccordJournal.Builder mergerFor(JournalKey journalKey)
{
- return new SavedCommand.Builder(journalKey.id, ALL);
+ return new AccordJournal.Builder(journalKey.id, ALL);
}
@Override
- public void serialize(JournalKey key, SavedCommand.Writer writer,
DataOutputPlus out, int userVersion)
+ public void serialize(JournalKey key, AccordJournal.Writer writer,
DataOutputPlus out, int userVersion)
{
try
{
@@ -78,13 +78,17 @@ public class AccordJournalValueSerializers
}
@Override
- public void reserialize(JournalKey key, SavedCommand.Builder from,
DataOutputPlus out, int userVersion) throws IOException
+ public void reserialize(JournalKey key, AccordJournal.Builder from,
DataOutputPlus out, int userVersion) throws IOException
{
- from.serialize(out, userVersion);
+ from.serialize(out,
+ // In CompactionIterator, we are dealing with
relatively recent records, so we do not pass redundant before here.
+ // However, we do on load and during Journal
SSTable compaction.
+ RedundantBefore.EMPTY,
+ userVersion);
}
@Override
- public void deserialize(JournalKey journalKey, SavedCommand.Builder
into, DataInputPlus in, int userVersion) throws IOException
+ public void deserialize(JournalKey journalKey, AccordJournal.Builder
into, DataInputPlus in, int userVersion) throws IOException
{
into.deserializeNext(in, userVersion);
}
@@ -296,8 +300,8 @@ public class AccordJournalValueSerializers
from.forEach((epoch, ranges) -> {
try
{
- KeySerializers.ranges.serialize(ranges, out,
messagingVersion);
out.writeLong(epoch);
+ KeySerializers.ranges.serialize(ranges, out,
messagingVersion);
}
catch (Throwable t)
{
@@ -320,8 +324,8 @@ public class AccordJournalValueSerializers
long[] epochs = new long[size];
for (int i = 0; i < ranges.length; i++)
{
- ranges[i] = KeySerializers.ranges.deserialize(in,
messagingVersion);
epochs[i] = in.readLong();
+ ranges[i] = KeySerializers.ranges.deserialize(in,
messagingVersion);
}
Invariants.checkState(ranges.length == epochs.length);
into.update(new RangesForEpoch(epochs, ranges));
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index ea4277391a..db36c42562 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -618,7 +618,7 @@ public abstract class AccordTask<R> extends Task implements
Runnable, Function<S
condition.awaitUninterruptibly();
for (Command check : sanityCheck)
- this.commandStore.sanityCheckCommand(check);
+
this.commandStore.sanityCheckCommand(commandStore.unsafeGetRedundantBefore(),
check);
if (onFlush != null) onFlush.run();
}
diff --git
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index b0aa948e26..29cdcb5195 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -169,7 +169,7 @@ public class CommandsForRanges extends TreeMap<Timestamp,
Summary> implements Co
{
if (findAsDep == null)
{
- SavedCommand.MinimalCommand cmd =
manager.commandStore.loadMinimal(txnId);
+ Command.Minimal cmd = manager.commandStore.loadMinimal(txnId);
if (cmd != null)
return ifRelevant(cmd);
}
@@ -225,7 +225,7 @@ public class CommandsForRanges extends TreeMap<Timestamp,
Summary> implements Co
return ifRelevant(cmd.txnId(), cmd.executeAt(), cmd.saveStatus(),
cmd.participants(), cmd.partialDeps());
}
- public Summary ifRelevant(SavedCommand.MinimalCommand cmd)
+ public Summary ifRelevant(Command.Minimal cmd)
{
Invariants.checkState(findAsDep == null);
return ifRelevant(cmd.txnId, cmd.executeAt, cmd.saveStatus,
cmd.participants, null);
diff --git a/src/java/org/apache/cassandra/service/accord/IJournal.java
b/src/java/org/apache/cassandra/service/accord/IJournal.java
deleted file mode 100644
index 66b1f65ce5..0000000000
--- a/src/java/org/apache/cassandra/service/accord/IJournal.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import accord.api.Journal;
-import accord.local.Command;
-import accord.local.DurableBefore;
-import accord.local.RedundantBefore;
-import accord.primitives.TxnId;
-import accord.utils.PersistentField.Persister;
-
-public interface IJournal extends Journal
-{
- // TODO (required): migrate to accord.api.Journal
- default SavedCommand.MinimalCommand loadMinimal(int commandStoreId, TxnId
txnId, SavedCommand.Load load, RedundantBefore redundantBefore, DurableBefore
durableBefore)
- {
- Command command = loadCommand(commandStoreId, txnId, redundantBefore,
durableBefore);
- if (command == null)
- return null;
- return new SavedCommand.MinimalCommand(command.txnId(),
command.saveStatus(), command.participants(), command.durability(),
command.executeAt(), command.writes());
- }
-
- Persister<DurableBefore, DurableBefore> durableBeforePersister();
-}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java
b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
deleted file mode 100644
index 9e38e5158c..0000000000
--- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java
+++ /dev/null
@@ -1,980 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import accord.api.Agent;
-import accord.api.Result;
-import accord.local.Cleanup;
-import accord.local.Command;
-import accord.local.CommonAttributes;
-import accord.local.DurableBefore;
-import accord.local.RedundantBefore;
-import accord.local.StoreParticipants;
-import accord.primitives.Ballot;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.SaveStatus;
-import accord.primitives.Status;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.primitives.Writes;
-import accord.utils.Invariants;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.journal.Journal;
-import org.apache.cassandra.service.accord.serializers.CommandSerializers;
-import org.apache.cassandra.service.accord.serializers.DepsSerializers;
-import org.apache.cassandra.service.accord.serializers.ResultSerializers;
-import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
-import org.apache.cassandra.utils.Throwables;
-
-import static accord.local.Cleanup.NO;
-import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
-import static accord.primitives.Known.KnownDeps.DepsErased;
-import static accord.primitives.Known.KnownDeps.DepsUnknown;
-import static accord.primitives.Known.KnownDeps.NoDeps;
-import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
-import static accord.primitives.Status.Durability.NotDurable;
-import static accord.utils.Invariants.illegalState;
-import static
org.apache.cassandra.service.accord.SavedCommand.Fields.DURABILITY;
-import static
org.apache.cassandra.service.accord.SavedCommand.Fields.EXECUTE_AT;
-import static
org.apache.cassandra.service.accord.SavedCommand.Fields.PARTICIPANTS;
-import static org.apache.cassandra.service.accord.SavedCommand.Fields.RESULT;
-import static
org.apache.cassandra.service.accord.SavedCommand.Fields.SAVE_STATUS;
-import static org.apache.cassandra.service.accord.SavedCommand.Fields.WRITES;
-import static org.apache.cassandra.service.accord.SavedCommand.Load.ALL;
-
-public class SavedCommand
-{
- // This enum is order-dependent
- public enum Fields
- {
- PARTICIPANTS, // stored first so we can index it
- SAVE_STATUS,
- PARTIAL_DEPS,
- EXECUTE_AT,
- EXECUTES_AT_LEAST,
- DURABILITY,
- ACCEPTED,
- PROMISED,
- WAITING_ON,
- PARTIAL_TXN,
- WRITES,
- CLEANUP,
- RESULT,
- ;
-
- public static final Fields[] FIELDS = values();
- }
-
- // TODO: maybe rename this and enclosing classes?
- public static class Writer implements Journal.Writer
- {
- private final Command after;
- private final TxnId txnId;
- private final int flags;
-
- @VisibleForTesting
- public Writer(Command after, int flags)
- {
- this(after.txnId(), after, flags);
- }
-
- @VisibleForTesting
- public Writer(TxnId txnId, Command after, int flags)
- {
- this.txnId = txnId;
- this.after = after;
- this.flags = flags;
- }
-
- @VisibleForTesting
- public Command after()
- {
- return after;
- }
-
- public void write(DataOutputPlus out, int userVersion) throws
IOException
- {
- serialize(after, flags, out, userVersion);
- }
-
- public TxnId key()
- {
- return txnId;
- }
- }
-
- public static @Nullable ByteBuffer asSerializedDiff(Command before,
Command after, int userVersion) throws IOException
- {
- try (DataOutputBuffer out = new DataOutputBuffer())
- {
- Writer writer = diff(before, after);
- if (writer == null)
- return null;
-
- writer.write(out, userVersion);
- return out.asNewBuffer();
- }
- }
-
- @Nullable
- public static Writer diff(Command before, Command after)
- {
- if (before == after
- || after == null
- || after.saveStatus() == SaveStatus.Uninitialised)
- return null;
-
- int flags = validateFlags(getFlags(before, after));
- if (!anyFieldChanged(flags))
- return null;
-
- return new Writer(after, flags);
- }
-
- // TODO (required): calculate flags once
- private static boolean anyFieldChanged(int flags)
- {
- return (flags >>> 16) != 0;
- }
-
- private static int validateFlags(int flags)
- {
- Invariants.checkState(0 == (~(flags >>> 16) & (flags & 0xffff)));
- return flags;
- }
-
- public static void serialize(Command after, int flags, DataOutputPlus out,
int userVersion) throws IOException
- {
- Invariants.checkState(flags != 0);
- out.writeInt(flags);
-
- int iterable = toIterableSetFields(flags);
- while (iterable != 0)
- {
- Fields field = nextSetField(iterable);
- if (getFieldIsNull(field, flags))
- {
- iterable = unsetIterableFields(field, iterable);
- continue;
- }
-
- switch (field)
- {
- case EXECUTE_AT:
- CommandSerializers.timestamp.serialize(after.executeAt(),
out, userVersion);
- break;
- case EXECUTES_AT_LEAST:
-
CommandSerializers.timestamp.serialize(after.executesAtLeast(), out,
userVersion);
- break;
- case SAVE_STATUS:
- out.writeShort(after.saveStatus().ordinal());
- break;
- case DURABILITY:
- out.writeByte(after.durability().ordinal());
- break;
- case ACCEPTED:
-
CommandSerializers.ballot.serialize(after.acceptedOrCommitted(), out,
userVersion);
- break;
- case PROMISED:
- CommandSerializers.ballot.serialize(after.promised(), out,
userVersion);
- break;
- case PARTICIPANTS:
-
CommandSerializers.participants.serialize(after.participants(), out,
userVersion);
- break;
- case PARTIAL_TXN:
-
CommandSerializers.partialTxn.serialize(after.partialTxn(), out, userVersion);
- break;
- case PARTIAL_DEPS:
- DepsSerializers.partialDeps.serialize(after.partialDeps(),
out, userVersion);
- break;
- case WAITING_ON:
- Command.WaitingOn waitingOn = getWaitingOn(after);
- long size =
WaitingOnSerializer.serializedSize(after.txnId(), waitingOn);
- ByteBuffer serialized =
WaitingOnSerializer.serialize(after.txnId(), waitingOn);
- Invariants.checkState(serialized.remaining() == size);
- out.writeInt((int) size);
- out.write(serialized);
- break;
- case WRITES:
- CommandSerializers.writes.serialize(after.writes(), out,
userVersion);
- break;
- case RESULT:
- ResultSerializers.result.serialize(after.result(), out,
userVersion);
- break;
- case CLEANUP:
- throw new IllegalStateException();
- }
-
- iterable = unsetIterableFields(field, iterable);
- }
- }
-
- @VisibleForTesting
- public static int getFlags(Command before, Command after)
- {
- int flags = 0;
-
- flags = collectFlags(before, after, Command::executeAt, true,
Fields.EXECUTE_AT, flags);
- flags = collectFlags(before, after, Command::executesAtLeast, true,
Fields.EXECUTES_AT_LEAST, flags);
- flags = collectFlags(before, after, Command::saveStatus, false,
SAVE_STATUS, flags);
- flags = collectFlags(before, after, Command::durability, false,
DURABILITY, flags);
-
- flags = collectFlags(before, after, Command::acceptedOrCommitted,
false, Fields.ACCEPTED, flags);
- flags = collectFlags(before, after, Command::promised, false,
Fields.PROMISED, flags);
-
- flags = collectFlags(before, after, Command::participants, true,
PARTICIPANTS, flags);
- flags = collectFlags(before, after, Command::partialTxn, false,
Fields.PARTIAL_TXN, flags);
- flags = collectFlags(before, after, Command::partialDeps, false,
Fields.PARTIAL_DEPS, flags);
-
- // TODO: waitingOn vs WaitingOnWithExecutedAt?
- flags = collectFlags(before, after, SavedCommand::getWaitingOn, true,
Fields.WAITING_ON, flags);
-
- flags = collectFlags(before, after, Command::writes, false, WRITES,
flags);
-
- // Special-cased for Journal BurnTest integration
- if ((before != null && before.result() != null && before.result() !=
ResultSerializers.APPLIED) ||
- (after != null && after.result() != null && after.result() !=
ResultSerializers.APPLIED))
- {
- flags = collectFlags(before, after, Command::writes, false,
RESULT, flags);
- }
-
- return flags;
- }
-
- static Command.WaitingOn getWaitingOn(Command command)
- {
- if (command instanceof Command.Committed)
- return command.asCommitted().waitingOn();
-
- return null;
- }
-
- private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ,
VAL> convert, boolean allowClassMismatch, Fields field, int flags)
- {
- VAL l = null;
- VAL r = null;
- if (lo != null) l = convert.apply(lo);
- if (ro != null) r = convert.apply(ro);
-
- if (l == r)
- return flags; // no change
-
- if (r == null)
- flags = setFieldIsNull(field, flags);
-
- if (l == null || r == null)
- return setFieldChanged(field, flags);
-
- assert allowClassMismatch || l.getClass() == r.getClass() :
String.format("%s != %s", l.getClass(), r.getClass());
-
- if (l.equals(r))
- return flags; // no change
-
- return setFieldChanged(field, flags);
- }
-
- private static int setFieldChanged(Fields field, int oldFlags)
- {
- return oldFlags | (0x10000 << field.ordinal());
- }
-
- @VisibleForTesting
- static boolean getFieldChanged(Fields field, int oldFlags)
- {
- return (oldFlags & (0x10000 << field.ordinal())) != 0;
- }
-
- static int toIterableSetFields(int flags)
- {
- return flags >>> 16;
- }
-
- static Fields nextSetField(int iterable)
- {
- int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable));
- return i == 32 ? null : Fields.FIELDS[i];
- }
-
- static int unsetIterableFields(Fields field, int iterable)
- {
- return iterable & ~(1 << field.ordinal());
- }
-
- @VisibleForTesting
- static boolean getFieldIsNull(Fields field, int oldFlags)
- {
- return (oldFlags & (1 << field.ordinal())) != 0;
- }
-
- private static int setFieldIsNull(Fields field, int oldFlags)
- {
- return oldFlags | (1 << field.ordinal());
- }
-
- public enum Load
- {
- ALL(0),
- PURGEABLE(SAVE_STATUS, PARTICIPANTS, DURABILITY, EXECUTE_AT, WRITES),
- MINIMAL(SAVE_STATUS, PARTICIPANTS, EXECUTE_AT);
-
- final int mask;
-
- Load(int mask)
- {
- this.mask = mask;
- }
-
- Load(Fields ... fields)
- {
- int mask = -1;
- for (Fields field : fields)
- mask &= ~(1<< field.ordinal());
- this.mask = mask;
- }
- }
-
- public static class MinimalCommand
- {
- public final TxnId txnId;
- public final SaveStatus saveStatus;
- public final StoreParticipants participants;
- public final Status.Durability durability;
- public final Timestamp executeAt;
- public final Writes writes;
-
- public MinimalCommand(TxnId txnId, SaveStatus saveStatus,
StoreParticipants participants, Status.Durability durability, Timestamp
executeAt, Writes writes)
- {
- this.txnId = txnId;
- this.saveStatus = saveStatus;
- this.participants = participants;
- this.durability = durability;
- this.executeAt = executeAt;
- this.writes = writes;
- }
- }
-
- public static class Builder
- {
- final int mask;
- int flags;
-
- TxnId txnId;
-
- Timestamp executeAt;
- Timestamp executeAtLeast;
- SaveStatus saveStatus;
- Status.Durability durability;
-
- Ballot acceptedOrCommitted;
- Ballot promised;
-
- StoreParticipants participants;
- PartialTxn partialTxn;
- PartialDeps partialDeps;
-
- byte[] waitingOnBytes;
- SavedCommand.WaitingOnProvider waitingOn;
- Writes writes;
- Result result;
- Cleanup cleanup;
-
- boolean nextCalled;
- int count;
-
- public Builder(TxnId txnId, Load load)
- {
- this.mask = load.mask;
- init(txnId);
- }
-
- public Builder(TxnId txnId)
- {
- this(txnId, ALL);
- }
-
- public Builder(Load load)
- {
- this.mask = load.mask;
- }
-
- public Builder()
- {
- this(ALL);
- }
-
- public TxnId txnId()
- {
- return txnId;
- }
-
- public Timestamp executeAt()
- {
- return executeAt;
- }
-
- public Timestamp executeAtLeast()
- {
- return executeAtLeast;
- }
-
- public SaveStatus saveStatus()
- {
- return saveStatus;
- }
-
- public Status.Durability durability()
- {
- return durability;
- }
-
- public Ballot acceptedOrCommitted()
- {
- return acceptedOrCommitted;
- }
-
- public Ballot promised()
- {
- return promised;
- }
-
- public StoreParticipants participants()
- {
- return participants;
- }
-
- public PartialTxn partialTxn()
- {
- return partialTxn;
- }
-
- public PartialDeps partialDeps()
- {
- return partialDeps;
- }
-
- public SavedCommand.WaitingOnProvider waitingOn()
- {
- return waitingOn;
- }
-
- public Writes writes()
- {
- return writes;
- }
-
- public Result result()
- {
- return result;
- }
-
- public void clear()
- {
- flags = 0;
- txnId = null;
-
- executeAt = null;
- executeAtLeast = null;
- saveStatus = null;
- durability = null;
-
- acceptedOrCommitted = null;
- promised = null;
-
- participants = null;
- partialTxn = null;
- partialDeps = null;
-
- waitingOnBytes = null;
- waitingOn = null;
- writes = null;
- result = null;
- cleanup = null;
-
- nextCalled = false;
- count = 0;
- }
-
- public void reset(TxnId txnId)
- {
- clear();
- init(txnId);
- }
-
- public void init(TxnId txnId)
- {
- this.txnId = txnId;
- durability = NotDurable;
- acceptedOrCommitted = promised = Ballot.ZERO;
- waitingOn = (txn, deps) -> null;
- result = ResultSerializers.APPLIED;
- }
-
- public boolean isEmpty()
- {
- return !nextCalled;
- }
-
- public int count()
- {
- return count;
- }
-
- public Cleanup shouldCleanup(Agent agent, RedundantBefore
redundantBefore, DurableBefore durableBefore)
- {
- if (!nextCalled)
- return NO;
-
- if (saveStatus == null || participants == null)
- return Cleanup.NO;
-
- Cleanup cleanup = Cleanup.shouldCleanupPartial(agent, txnId,
saveStatus, durability, participants, redundantBefore, durableBefore);
- if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
- cleanup = this.cleanup;
- return cleanup;
- }
-
- // TODO (expected): avoid allocating new builder
- public Builder maybeCleanup(Cleanup cleanup)
- {
- if (saveStatus() == null)
- return this;
-
- switch (cleanup)
- {
- case EXPUNGE:
- case ERASE:
- return null;
-
- case EXPUNGE_PARTIAL:
- return expungePartial(cleanup, saveStatus, true);
-
- case VESTIGIAL:
- case INVALIDATE:
- return saveStatusOnly();
-
- case TRUNCATE_WITH_OUTCOME:
- case TRUNCATE:
- return expungePartial(cleanup, cleanup.appliesIfNot,
cleanup == TRUNCATE_WITH_OUTCOME);
-
- case NO:
- return this;
- default:
- throw new IllegalStateException("Unknown cleanup: " +
cleanup);}
- }
-
- private Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus,
boolean includeOutcome)
- {
- Invariants.checkState(txnId != null);
- Builder builder = new Builder(txnId, ALL);
-
- builder.count++;
- builder.nextCalled = true;
-
- Invariants.checkState(saveStatus != null);
- builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
- builder.saveStatus = saveStatus;
- builder.flags = setFieldChanged(Fields.CLEANUP, builder.flags);
- builder.cleanup = cleanup;
- if (executeAt != null)
- {
- builder.flags = setFieldChanged(Fields.EXECUTE_AT,
builder.flags);
- builder.executeAt = executeAt;
- }
- if (durability != null)
- {
- builder.flags = setFieldChanged(DURABILITY, builder.flags);
- builder.durability = durability;
- }
- if (participants != null)
- {
- builder.flags = setFieldChanged(PARTICIPANTS, builder.flags);
- builder.participants = participants;
- }
- if (includeOutcome && builder.writes != null)
- {
- builder.flags = setFieldChanged(WRITES, builder.flags);
- builder.writes = writes;
- }
-
- return builder;
- }
-
- private Builder saveStatusOnly()
- {
- Invariants.checkState(txnId != null);
- Builder builder = new Builder(txnId, ALL);
-
- builder.count++;
- builder.nextCalled = true;
-
- // TODO: these accesses can be abstracted away
- if (saveStatus != null)
- {
- builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
- builder.saveStatus = saveStatus;
- }
-
- return builder;
- }
-
- public ByteBuffer asByteBuffer(int userVersion) throws IOException
- {
- try (DataOutputBuffer out = new DataOutputBuffer())
- {
- serialize(out, userVersion);
- return out.asNewBuffer();
- }
- }
-
- public MinimalCommand asMinimal()
- {
- return new MinimalCommand(txnId, saveStatus, participants,
durability, executeAt, writes);
- }
-
- public void serialize(DataOutputPlus out, int userVersion) throws
IOException
- {
- Invariants.checkState(mask == 0);
- Invariants.checkState(flags != 0);
- out.writeInt(validateFlags(flags));
-
- int iterable = toIterableSetFields(flags);
- while (iterable != 0)
- {
- Fields field = nextSetField(iterable);
- if (getFieldIsNull(field, flags))
- {
- iterable = unsetIterableFields(field, iterable);
- continue;
- }
-
- switch (field)
- {
- case EXECUTE_AT:
- CommandSerializers.timestamp.serialize(executeAt(),
out, userVersion);
- break;
- case EXECUTES_AT_LEAST:
-
CommandSerializers.timestamp.serialize(executeAtLeast(), out, userVersion);
- break;
- case SAVE_STATUS:
- out.writeShort(saveStatus().ordinal());
- break;
- case DURABILITY:
- out.writeByte(durability().ordinal());
- break;
- case ACCEPTED:
-
CommandSerializers.ballot.serialize(acceptedOrCommitted(), out, userVersion);
- break;
- case PROMISED:
- CommandSerializers.ballot.serialize(promised(), out,
userVersion);
- break;
- case PARTICIPANTS:
-
CommandSerializers.participants.serialize(participants(), out, userVersion);
- break;
- case PARTIAL_TXN:
- CommandSerializers.partialTxn.serialize(partialTxn(),
out, userVersion);
- break;
- case PARTIAL_DEPS:
- DepsSerializers.partialDeps.serialize(partialDeps(),
out, userVersion);
- break;
- case WAITING_ON:
- out.writeInt(waitingOnBytes.length);
- out.write(waitingOnBytes);
- break;
- case WRITES:
- CommandSerializers.writes.serialize(writes(), out,
userVersion);
- break;
- case CLEANUP:
- out.writeByte(cleanup.ordinal());
- break;
- case RESULT:
- ResultSerializers.result.serialize(result(), out,
userVersion);
- break;
- }
-
- iterable = unsetIterableFields(field, iterable);
- }
- }
-
- public void deserializeNext(DataInputPlus in, int userVersion) throws
IOException
- {
- Invariants.checkState(txnId != null);
- int flags = in.readInt();
- Invariants.checkState(flags != 0);
- nextCalled = true;
- count++;
-
- int iterable = toIterableSetFields(flags);
- while (iterable != 0)
- {
- Fields field = nextSetField(iterable);
- if (getFieldChanged(field, this.flags) ||
getFieldIsNull(field, mask))
- {
- if (!getFieldIsNull(field, flags))
- skip(field, in, userVersion);
-
- iterable = unsetIterableFields(field, iterable);
- continue;
- }
- this.flags = setFieldChanged(field, this.flags);
-
- if (getFieldIsNull(field, flags))
- {
- this.flags = setFieldIsNull(field, this.flags);
- }
- else
- {
- deserialize(field, in, userVersion);
- }
-
- iterable = unsetIterableFields(field, iterable);
- }
- }
-
- private void deserialize(Fields field, DataInputPlus in, int
userVersion) throws IOException
- {
- switch (field)
- {
- case EXECUTE_AT:
- executeAt = CommandSerializers.timestamp.deserialize(in,
userVersion);
- break;
- case EXECUTES_AT_LEAST:
- executeAtLeast =
CommandSerializers.timestamp.deserialize(in, userVersion);
- break;
- case SAVE_STATUS:
- saveStatus = SaveStatus.values()[in.readShort()];
- break;
- case DURABILITY:
- durability = Status.Durability.values()[in.readByte()];
- break;
- case ACCEPTED:
- acceptedOrCommitted =
CommandSerializers.ballot.deserialize(in, userVersion);
- break;
- case PROMISED:
- promised = CommandSerializers.ballot.deserialize(in,
userVersion);
- break;
- case PARTICIPANTS:
- participants =
CommandSerializers.participants.deserialize(in, userVersion);
- break;
- case PARTIAL_TXN:
- partialTxn = CommandSerializers.partialTxn.deserialize(in,
userVersion);
- break;
- case PARTIAL_DEPS:
- partialDeps = DepsSerializers.partialDeps.deserialize(in,
userVersion);
- break;
- case WAITING_ON:
- int size = in.readInt();
- waitingOnBytes = new byte[size];
- in.readFully(waitingOnBytes);
- ByteBuffer buffer = ByteBuffer.wrap(waitingOnBytes);
- waitingOn = (localTxnId, deps) -> {
- try
- {
- Invariants.nonNull(deps);
- return WaitingOnSerializer.deserialize(localTxnId,
deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer);
- }
- catch (IOException e)
- {
- throw Throwables.unchecked(e);
- }
- };
- break;
- case WRITES:
- writes = CommandSerializers.writes.deserialize(in,
userVersion);
- break;
- case CLEANUP:
- Cleanup newCleanup = Cleanup.forOrdinal(in.readByte());
- if (cleanup == null || newCleanup.compareTo(cleanup) > 0)
- cleanup = newCleanup;
- break;
- case RESULT:
- result = ResultSerializers.result.deserialize(in,
userVersion);
- break;
- }
- }
-
- private void skip(Fields field, DataInputPlus in, int userVersion)
throws IOException
- {
- switch (field)
- {
- case EXECUTE_AT:
- case EXECUTES_AT_LEAST:
- CommandSerializers.timestamp.skip(in, userVersion);
- break;
- case SAVE_STATUS:
- in.readShort();
- break;
- case DURABILITY:
- in.readByte();
- break;
- case ACCEPTED:
- case PROMISED:
- CommandSerializers.ballot.skip(in, userVersion);
- break;
- case PARTICIPANTS:
- // TODO (expected): skip
- CommandSerializers.participants.deserialize(in,
userVersion);
- break;
- case PARTIAL_TXN:
- CommandSerializers.partialTxn.deserialize(in, userVersion);
- break;
- case PARTIAL_DEPS:
- // TODO (expected): skip
- DepsSerializers.partialDeps.deserialize(in, userVersion);
- break;
- case WAITING_ON:
- int size = in.readInt();
- in.skipBytesFully(size);
- break;
- case WRITES:
- // TODO (expected): skip
- CommandSerializers.writes.deserialize(in, userVersion);
- break;
- case CLEANUP:
- in.readByte();
- break;
- case RESULT:
- // TODO (expected): skip
- result = ResultSerializers.result.deserialize(in,
userVersion);
- break;
- }
- }
-
- public void forceResult(Result newValue)
- {
- this.result = newValue;
- }
-
- public Command construct()
- {
- if (!nextCalled)
- return null;
-
- Invariants.checkState(txnId != null);
- CommonAttributes.Mutable attrs = new
CommonAttributes.Mutable(txnId);
- if (partialTxn != null)
- attrs.partialTxn(partialTxn);
- if (durability != null)
- attrs.durability(durability);
- if (participants != null)
- attrs.setParticipants(participants);
- else
- attrs.setParticipants(StoreParticipants.empty(txnId));
- if (partialDeps != null &&
- (saveStatus.known.deps != NoDeps &&
- saveStatus.known.deps != DepsErased &&
- saveStatus.known.deps != DepsUnknown))
- attrs.partialDeps(partialDeps);
-
- switch (saveStatus.known.outcome)
- {
- case Erased:
- case WasApply:
- writes = null;
- result = null;
- break;
- }
-
- Command.WaitingOn waitingOn = null;
- if (this.waitingOn != null)
- waitingOn = this.waitingOn.provide(txnId, partialDeps);
-
- switch (saveStatus.status)
- {
- case NotDefined:
- return saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
- :
Command.NotDefined.notDefined(attrs, promised);
- case PreAccepted:
- return Command.PreAccepted.preAccepted(attrs, executeAt,
promised);
- case AcceptedInvalidate:
- case Accepted:
- case PreCommitted:
- if (saveStatus == SaveStatus.AcceptedInvalidate)
- return
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised,
acceptedOrCommitted);
- else
- return Command.Accepted.accepted(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted);
- case Committed:
- case Stable:
- return Command.Committed.committed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn);
- case PreApplied:
- case Applied:
- return Command.Executed.executed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
- case Truncated:
- case Invalidated:
- return truncated(attrs, saveStatus, executeAt,
executeAtLeast, writes, result);
- default:
- throw new IllegalStateException();
- }
- }
-
- private static Command.Truncated truncated(CommonAttributes.Mutable
attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast,
Writes writes, Result result)
- {
- switch (status)
- {
- default:
- throw illegalState("Unhandled SaveStatus: " + status);
- case TruncatedApplyWithOutcome:
- case TruncatedApplyWithDeps:
- case TruncatedApply:
- if (status != TruncatedApplyWithOutcome)
- result = null;
- if (attrs.txnId().kind().awaitsOnlyDeps())
- return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result, executesAtLeast);
- return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result, null);
- case ErasedOrVestigial:
- return Command.Truncated.erasedOrVestigial(attrs.txnId(),
attrs.participants());
- case Erased:
- return Command.Truncated.erased(attrs.txnId(),
attrs.durability(), attrs.participants());
- case Invalidated:
- return Command.Truncated.invalidated(attrs.txnId());
- }
- }
-
- public String toString()
- {
- return "Diff {" +
- "txnId=" + txnId +
- ", executeAt=" + executeAt +
- ", saveStatus=" + saveStatus +
- ", durability=" + durability +
- ", acceptedOrCommitted=" + acceptedOrCommitted +
- ", promised=" + promised +
- ", participants=" + participants +
- ", partialTxn=" + partialTxn +
- ", partialDeps=" + partialDeps +
- ", waitingOn=" + waitingOn +
- ", writes=" + writes +
- '}';
- }
- }
-
- public interface WaitingOnProvider
- {
- Command.WaitingOn provide(TxnId txnId, PartialDeps deps);
- }
-}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 6ad2f09d2a..2a99a350b5 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -186,9 +186,6 @@ public class AccordLoadTest extends AccordTestBase
System.out.println("compacting accord...");
cluster.forEach(i -> {
i.nodetool("compact", "system_accord.journal");
- i.runOnInstance(() -> {
- ((AccordService)
AccordService.instance()).journal().checkAllCommands();
- });
});
}
@@ -198,7 +195,6 @@ public class AccordLoadTest extends AccordTestBase
System.out.println("flushing journal...");
cluster.forEach(i -> i.runOnInstance(() -> {
((AccordService)
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
- ((AccordService)
AccordService.instance()).journal().checkAllCommands();
}));
}
diff --git
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index 25f9c61d38..9e41e0add9 100644
---
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -136,7 +136,7 @@ public class AccordJournalBurnTest extends BurnTestBase
operations,
10 + random.nextInt(30),
new RandomDelayQueue.Factory(random).get(),
- (node) -> {
+ (node, agent) -> {
try
{
File directory = new
File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet())));
@@ -175,6 +175,4 @@ public class AccordJournalBurnTest extends BurnTestBase
throw SimulationException.wrap(seed, t);
}
}
-
-
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
index 34bb215c0a..3c655133fa 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
@@ -89,7 +89,7 @@ public class AccordJournalOrderTest
Runnable check = () -> {
for (JournalKey key : res.keySet())
{
- SavedCommand.Builder diffs =
accordJournal.loadDiffs(key.commandStoreId, key.id);
+ AccordJournal.Builder diffs =
accordJournal.load(key.commandStoreId, key.id);
Assert.assertEquals(String.format("%d != %d for key %s",
diffs.count(), res.get(key).intValue(), key),
diffs.count(), res.get(key).intValue());
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
similarity index 87%
rename from test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
rename to test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
index 1760286ba3..2878c5750a 100644
--- a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
@@ -26,7 +26,9 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import accord.impl.CommandChange;
import accord.local.Command;
+import accord.local.RedundantBefore;
import accord.primitives.SaveStatus;
import accord.primitives.TxnId;
import accord.utils.Gen;
@@ -39,17 +41,17 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.SavedCommand.Fields;
-import org.apache.cassandra.service.accord.SavedCommand.Load;
import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.utils.AccordGenerators;
import org.assertj.core.api.SoftAssertions;
+import static accord.api.Journal.*;
+import static accord.impl.CommandChange.*;
+import static accord.impl.CommandChange.getFlags;
import static accord.utils.Property.qt;
import static
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
-import static org.apache.cassandra.service.accord.SavedCommand.getFlags;
-public class SavedCommandTest
+public class CommandChangeTest
{
private static final EnumSet<Fields> ALL = EnumSet.allOf(Fields.class);
@@ -97,13 +99,14 @@ public class SavedCommandTest
if (saveStatus == SaveStatus.TruncatedApplyWithDeps)
continue;
out.clear();
Command orig = cmdBuilder.build(saveStatus);
- SavedCommand.serialize(orig, getFlags(null, orig), out,
userVersion);
- SavedCommand.Builder builder = new
SavedCommand.Builder(orig.txnId(), Load.ALL);
+
+ AccordJournal.Writer.make(null, orig).write(out,
userVersion);
+ AccordJournal.Builder builder = new
AccordJournal.Builder(orig.txnId(), Load.ALL);
builder.deserializeNext(new
DataInputBuffer(out.unsafeGetBufferAndFlip(), false), userVersion);
// We are not persisting the result, so force it for
strict equality
builder.forceResult(orig.result());
- Command reconstructed = builder.construct();
+ Command reconstructed =
builder.construct(RedundantBefore.EMPTY);
checks.assertThat(reconstructed)
.describedAs("lhs=expected\nrhs=actual\n%s", new
LazyToString(() -> ReflectionUtils.recursiveEquals(orig,
reconstructed).toString()))
@@ -119,10 +122,10 @@ public class SavedCommandTest
SoftAssertions checks = new SoftAssertions();
for (Fields field : missing)
{
- checks.assertThat(SavedCommand.getFieldChanged(field, flags))
+ checks.assertThat(CommandChange.getFieldChanged(field, flags))
.describedAs("field %s changed", field).
isTrue();
- checks.assertThat(SavedCommand.getFieldIsNull(field, flags))
+ checks.assertThat(CommandChange.getFieldIsNull(field, flags))
.describedAs("field %s not null", field)
.isFalse();
}
@@ -135,11 +138,11 @@ public class SavedCommandTest
for (Fields field : missing)
{
if (field == Fields.CLEANUP) continue;
- checks.assertThat(SavedCommand.getFieldChanged(field, flags))
+ checks.assertThat(CommandChange.getFieldChanged(field, flags))
.describedAs("field %s changed", field)
.isFalse();
// Is null flag can not be set on a field that has not changed
- checks.assertThat(SavedCommand.getFieldIsNull(field, flags))
+ checks.assertThat(CommandChange.getFieldIsNull(field, flags))
.describedAs("field %s not null", field)
.isFalse();
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index c376c46fbf..65b315a7e2 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -30,6 +30,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
+import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.RemoteListeners;
@@ -39,6 +40,7 @@ import accord.impl.DefaultLocalListeners;
import accord.impl.DefaultTimeouts;
import accord.impl.SizeOfIntersectionSorter;
import accord.impl.TestAgent;
+import accord.impl.basic.InMemoryJournal;
import accord.impl.basic.SimulatedFault;
import accord.local.Command;
import accord.local.CommandStore;
@@ -68,7 +70,6 @@ import accord.primitives.Unseekables;
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.utils.Gens;
-import accord.utils.PersistentField;
import accord.utils.RandomSource;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
@@ -105,7 +106,7 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
public final Node.Id nodeId;
public final Topology topology;
public final Topologies topologies;
- public final IJournal journal;
+ public final Journal journal;
public final ScheduledExecutorPlus unorderedScheduled;
public final List<String> evictions = new ArrayList<>();
public Predicate<Throwable> ignoreExceptions = ignore -> false;
@@ -185,7 +186,6 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
}
};
- this.journal = new InMemoryJournal(nodeId);
TestAgent.RethrowAgent agent = new TestAgent.RethrowAgent()
{
@Override
@@ -201,6 +201,8 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
super.onUncaughtException(t);
}
};
+
+ this.journal = new InMemoryJournal(nodeId, agent);
this.commandStore = new AccordCommandStore(0,
storeService,
agent,
@@ -246,19 +248,6 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
});
}
- private final class InMemoryJournal extends
accord.impl.basic.InMemoryJournal implements IJournal
- {
- public InMemoryJournal(Node.Id id)
- {
- super(id);
- }
-
- public PersistentField.Persister<DurableBefore, DurableBefore>
durableBeforePersister()
- {
- throw new IllegalArgumentException("Not implemented");
- }
- }
-
private <K, V> void updateLoadFunction(AccordCache.Type<K, V, ?> i,
FunctionWrapper wrapper)
{
i.unsafeSetLoadFunction(wrapper.wrap(i.unsafeGetLoadFunction()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]