This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 4c8c0c8b3e Accord: Fix unit tests and improve burn test stability
4c8c0c8b3e is described below
commit 4c8c0c8b3e008c04d5c5f9eef1e8e0b321596c5f
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Nov 19 20:04:11 2024 +0000
Accord: Fix unit tests and improve burn test stability
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20113
---
modules/accord | 2 +-
.../service/accord/AccordCommandStore.java | 22 ++-
.../cassandra/service/accord/AccordExecutor.java | 3 +-
.../service/accord/AccordSafeCommandStore.java | 162 ++-------------------
.../cassandra/service/accord/AccordService.java | 6 +-
.../service/accord/CommandsForRanges.java | 7 +-
.../accord/interop/AccordInteropAdapter.java | 13 +-
.../service/accord/interop/AccordInteropApply.java | 10 +-
.../accord/interop/AccordInteropPersist.java | 5 +-
.../accord/serializers/AwaitSerializer.java | 11 +-
.../accord/serializers/ReadDataSerializers.java | 14 +-
.../compaction/CompactionAccordIteratorsTest.java | 15 +-
.../cassandra/service/accord/AccordTaskTest.java | 2 +-
.../serializers/CommandsForKeySerializerTest.java | 13 +-
.../apache/cassandra/utils/AccordGenerators.java | 16 +-
15 files changed, 98 insertions(+), 203 deletions(-)
diff --git a/modules/accord b/modules/accord
index a271897790..61f4ebd21f 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit a271897790aa3816c3dea2125b1e374b091bc090
+Subproject commit 61f4ebd21ff34b0807081d9df442236d8c334b52
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index d3c59fbd7f..f0b4eb12b5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -38,6 +38,7 @@ import accord.api.DataStore;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
+import accord.impl.AbstractSafeCommandStore.CommandStoreCaches;
import accord.impl.TimestampsForKey;
import accord.local.Cleanup;
import accord.local.Command;
@@ -119,7 +120,7 @@ public class AccordCommandStore extends CommandStore
}
}
- public static final class ExclusiveCaches extends Caches implements
AutoCloseable
+ public static final class ExclusiveCaches extends Caches implements
CommandStoreCaches<AccordSafeCommand, AccordSafeTimestampsForKey,
AccordSafeCommandsForKey>
{
private final Lock lock;
@@ -129,6 +130,25 @@ public class AccordCommandStore extends CommandStore
this.lock = lock;
}
+
+ @Override
+ public AccordSafeCommand acquireIfLoaded(TxnId txnId)
+ {
+ return commands().acquireIfLoaded(txnId);
+ }
+
+ @Override
+ public AccordSafeCommandsForKey acquireIfLoaded(RoutingKey key)
+ {
+ return commandsForKeys().acquireIfLoaded(key);
+ }
+
+ @Override
+ public AccordSafeTimestampsForKey acquireTfkIfLoaded(RoutingKey key)
+ {
+ return timestampsForKeys().acquireIfLoaded(key);
+ }
+
@Override
public void close()
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index ce778dd76a..cc6e96364d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -561,7 +561,8 @@ public abstract class AccordExecutor implements CacheSize,
AccordCacheEntry.OnLo
private <K, V> void onLoadedExclusive(AccordCacheEntry<K, V> loaded, V
value, Throwable fail, boolean isForRange)
{
--activeLoads;
- --activeRangeLoads;
+ if (isForRange)
+ --activeRangeLoads;
if (loaded.status() != EVICTED)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index a1fe99b47c..75afc32625 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -18,11 +18,7 @@
package org.apache.cassandra.service.accord;
-import java.util.AbstractCollection;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
@@ -36,23 +32,16 @@ import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
+import accord.impl.AbstractSafeCommandStore;
import accord.impl.CommandsSummary;
-import accord.impl.SafeTimestampsForKey;
import accord.local.CommandStores;
import accord.local.CommandStores.RangesForEpoch;
-import accord.local.KeyHistory;
import accord.local.NodeCommandStoreService;
-import accord.local.PreLoadContext;
import accord.local.RedundantBefore;
-import accord.local.SafeCommandStore;
import accord.local.cfk.CommandsForKey;
import accord.primitives.AbstractKeys;
-import accord.primitives.AbstractUnseekableKeys;
-import accord.primitives.Participants;
import accord.primitives.Ranges;
-import accord.primitives.Routable;
import accord.primitives.Routables;
-import accord.primitives.RoutingKeys;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -60,14 +49,11 @@ import accord.primitives.Unseekables;
import accord.utils.Invariants;
import org.apache.cassandra.service.accord.AccordCommandStore.ExclusiveCaches;
-import static accord.local.KeyHistory.TIMESTAMPS;
-import static accord.utils.Invariants.illegalArgument;
import static accord.utils.Invariants.illegalState;
-public class AccordSafeCommandStore extends SafeCommandStore
+public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey,
AccordSafeCommandsForKey, AccordCommandStore.ExclusiveCaches>
{
final AccordTask<?> task;
- final PreLoadContext context;
private final @Nullable CommandsForRanges commandsForRanges;
private final AccordCommandStore commandStore;
private RangesForEpoch ranges;
@@ -77,7 +63,7 @@ public class AccordSafeCommandStore extends SafeCommandStore
@Nullable CommandsForRanges
commandsForRanges,
AccordCommandStore commandStore)
{
- this.context = task.preLoadContext();
+ super(task.preLoadContext());
this.task = task;
this.commandsForRanges = commandsForRanges;
this.commandStore = commandStore;
@@ -93,91 +79,6 @@ public class AccordSafeCommandStore extends SafeCommandStore
return new AccordSafeCommandStore(operation, commandsForRanges,
commandStore);
}
- @Override
- public PreLoadContext canExecute(PreLoadContext with)
- {
- if (with.isEmpty()) return with;
- if (with.keys().domain() == Routable.Domain.Range)
- return with.isSubsetOf(this.context) ? with : null;
-
- if (!context().keyHistory().satisfies(with.keyHistory()))
- return null;
-
- try (ExclusiveCaches caches = commandStore.tryLockCaches())
- {
- if (caches == null)
- return with.isSubsetOf(this.context) ? with : null;
-
- for (TxnId txnId : with.txnIds())
- {
- if (null != getInternal(txnId))
- continue;
-
- AccordSafeCommand safeCommand =
caches.commands().acquireIfLoaded(txnId);
- if (safeCommand == null)
- return null;
-
- add(safeCommand, caches);
- }
-
- KeyHistory keyHistory = with.keyHistory();
- if (keyHistory == KeyHistory.NONE)
- return with;
-
- List<RoutingKey> unavailable = null;
- AbstractUnseekableKeys keys = (AbstractUnseekableKeys)
context.keys();
- if (keys.size() == 0)
- return with;
-
- for (int i = 0 ; i < keys.size() ; ++i)
- {
- RoutingKey key = keys.get(i);
- if (keyHistory == TIMESTAMPS)
- {
- if (null != timestampsForKeyInternal(key))
- continue; // already in working set
-
- AccordSafeTimestampsForKey safeTfk =
caches.timestampsForKeys().acquireIfLoaded(key);
- if (safeTfk != null)
- {
- add(safeTfk, caches);
- continue;
- }
- }
- else
- {
- if (null != getInternal(key))
- continue; // already in working set
-
- AccordSafeCommandsForKey safeCfk =
caches.commandsForKeys().acquireIfLoaded(key);
- if (safeCfk != null)
- {
- add(safeCfk, caches);
- continue;
- }
- }
- if (unavailable == null)
- unavailable = new ArrayList<>();
- unavailable.add(key);
- }
-
- if (unavailable == null)
- return with;
-
- if (unavailable.size() == keys.size())
- return null;
-
- Participants<RoutingKey> available =
keys.without(RoutingKeys.ofSortedUnique(unavailable));
- return PreLoadContext.contextFor(context.primaryTxnId(),
context.additionalTxnId(), available, keyHistory);
- }
- }
-
- @Override
- public PreLoadContext context()
- {
- return context;
- }
-
@VisibleForTesting
public Set<RoutingKey> commandsForKeysKeys()
{
@@ -196,22 +97,12 @@ public class AccordSafeCommandStore extends
SafeCommandStore
}
@Override
- protected AccordSafeCommand
ifLoadedAndInitialisedAndNotErasedInternal(TxnId txnId)
+ protected ExclusiveCaches tryGetCaches()
{
- try (ExclusiveCaches caches = commandStore.tryLockCaches())
- {
- if (caches == null)
- return null;
-
- AccordSafeCommand command =
caches.commands().acquireIfLoaded(txnId);
- if (command == null)
- return null;
-
- return add(command, caches);
- }
+ return commandStore.tryLockCaches();
}
- private AccordSafeCommand add(AccordSafeCommand safeCommand,
ExclusiveCaches caches)
+ protected AccordSafeCommand add(AccordSafeCommand safeCommand,
ExclusiveCaches caches)
{
Object check = task.ensureCommands().putIfAbsent(safeCommand.txnId(),
safeCommand);
if (check == null)
@@ -226,7 +117,7 @@ public class AccordSafeCommandStore extends SafeCommandStore
}
}
- private AccordSafeCommandsForKey add(AccordSafeCommandsForKey safeCfk,
ExclusiveCaches caches)
+ protected AccordSafeCommandsForKey add(AccordSafeCommandsForKey safeCfk,
ExclusiveCaches caches)
{
Object check = task.ensureCommandsForKey().putIfAbsent(safeCfk.key(),
safeCfk);
if (check == null)
@@ -241,7 +132,7 @@ public class AccordSafeCommandStore extends SafeCommandStore
}
}
- private AccordSafeTimestampsForKey add(AccordSafeTimestampsForKey safeTfk,
ExclusiveCaches caches)
+ protected AccordSafeTimestampsForKey add(AccordSafeTimestampsForKey
safeTfk, ExclusiveCaches caches)
{
Object check =
task.ensureTimestampsForKey().putIfAbsent(safeTfk.key(), safeTfk);
if (check == null)
@@ -265,42 +156,7 @@ public class AccordSafeCommandStore extends
SafeCommandStore
return commandsForKey.get(key);
}
- @Override
- protected AccordSafeCommandsForKey ifLoadedInternal(RoutingKey key)
- {
- try (ExclusiveCaches caches = commandStore.tryLockCaches())
- {
- if (caches == null)
- return null;
-
- AccordSafeCommandsForKey safeCfk =
caches.commandsForKeys().acquireIfLoaded(key);
- if (safeCfk == null)
- return null;
-
- Object check =
task.ensureCommandsForKey().putIfAbsent(safeCfk.key(), safeCfk);
- if (check == null)
- {
- safeCfk.preExecute();
- return safeCfk;
- }
- else
- {
- caches.commandsForKeys().release(safeCfk, task);
- throw illegalState("Attempted to take a duplicate reference to
CFK for %s", key);
- }
- }
- }
-
- @Override
- public SafeTimestampsForKey timestampsForKey(RoutingKey key)
- {
- AccordSafeTimestampsForKey safeTfk = timestampsForKeyInternal(key);
- if (safeTfk == null)
- throw illegalArgument("%s not referenced in %s", key, context);
- return safeTfk;
- }
-
- private AccordSafeTimestampsForKey timestampsForKeyInternal(RoutingKey key)
+ protected AccordSafeTimestampsForKey timestampsForKeyInternal(RoutingKey
key)
{
Map<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey =
task.timestampsForKey();
if (timestampsForKey == null)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index b3bd311c92..ae9534ca3b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -1037,7 +1037,7 @@ public class AccordService implements IAccordService,
Shutdownable
{
if (state.knows(blockedBy)) continue;
// need to fetch the state
- if (safeStore.ifLoadedAndInitialisedAndNotErased(blockedBy) !=
null)
+ if (safeStore.ifLoadedAndInitialised(blockedBy) != null)
{
AsyncChain<Void> chain = populate(state, safeStore, blockedBy);
if (chain != null)
@@ -1077,7 +1077,7 @@ public class AccordService implements IAccordService,
Shutdownable
blocking = ((CommandsForKey.TxnInfo) blocking).plainTxnId();
state.keys.put(pk, blocking);
if (state.txns.containsKey(blocking)) return null;
- if (safeStore.ifLoadedAndInitialisedAndNotErased(blocking) != null)
return populate(state, safeStore, blocking);
+ if (safeStore.ifLoadedAndInitialised(blocking) != null) return
populate(state, safeStore, blocking);
return populate(state, safeStore.commandStore(), blocking);
}
@@ -1437,7 +1437,7 @@ public class AccordService implements IAccordService,
Shutdownable
{
if (tracker.recordSuccess(from) == RequestStatus.Success)
{
-
node.configService().reportEpochRedundant(exclusiveSyncPoint.route.toRanges(),
exclusiveSyncPoint.syncId.epoch());
+
node.configService().reportEpochRedundant(exclusiveSyncPoint.route.toRanges(),
exclusiveSyncPoint.syncId.epoch() - 1);
trySuccess(exclusiveSyncPoint);
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index 66f7daccab..13b21b5da8 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -57,7 +57,7 @@ import org.apache.cassandra.index.accord.RoutesSearcher;
import org.apache.cassandra.service.accord.api.AccordRoutingKey;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
-import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.local.SafeCommandStore.TestDep.WITH_OR_INVALIDATED;
import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE;
import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS;
import static accord.primitives.Routables.Slice.Minimal;
@@ -129,6 +129,9 @@ public class CommandsForRanges extends TreeMap<Timestamp,
CommandsForRanges.Summ
case IS_STABLE:
if (!summary.saveStatus.hasBeen(Stable) ||
summary.saveStatus.hasBeen(Truncated))
return;
+ case IS_STABLE_OR_INVALIDATED:
+ if (!summary.saveStatus.hasBeen(Stable) ||
summary.saveStatus.status == Truncated)
+ return;
}
if (testDep != ANY_DEPS)
@@ -152,7 +155,7 @@ public class CommandsForRanges extends TreeMap<Timestamp,
CommandsForRanges.Summ
// and so it is safe to execute, when in fact it is only a
dependency on a different shard
// (and that other shard, perhaps, does not know that it is a
dependency - and so it is not durably known)
// TODO (required): consider this some more
- if ((testDep == WITH) == !summary.hasAsDep)
+ if ((testDep == WITH_OR_INVALIDATED) == (!summary.hasAsDep ||
summary.saveStatus == SaveStatus.Invalidated))
return;
Invariants.checkState(testTxnId.equals(summary.findAsDep));
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
index e4464c44f4..820582d930 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
@@ -33,7 +33,7 @@ import accord.local.Node;
import accord.messages.Apply;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
-import accord.primitives.Participants;
+import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -89,14 +89,13 @@ public class AccordInteropAdapter extends AbstractTxnAdapter
}
@Override
- public void persist(Node node, Topologies all, FullRoute<?> route,
Participants<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
Writes writes, Result result, BiConsumer<? super Result, Throwable> callback)
+ public void persist(Node node, Topologies all, FullRoute<?> fullRoute,
Route<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes
writes, Result result, BiConsumer<? super Result, Throwable> callback)
{
- // TODO (required): we aren't using sendTo
- if (applyKind == Minimal && doInteropPersist(node, all, route, txnId,
txn, executeAt, deps, writes, result, callback))
+ if (applyKind == Minimal && doInteropPersist(node, all, sendTo, txnId,
txn, executeAt, deps, writes, result, fullRoute, callback))
return;
if (callback != null) callback.accept(result, null);
- new PersistTxn(node, all, txnId, route, txn, executeAt, deps, writes,
result)
+ new PersistTxn(node, all, txnId, sendTo, txn, executeAt, deps, writes,
result, fullRoute)
.start(Apply.FACTORY, applyKind, all, writes, result);
}
@@ -113,14 +112,14 @@ public class AccordInteropAdapter extends
AbstractTxnAdapter
return true;
}
- private static boolean doInteropPersist(Node node, Topologies all,
FullRoute<?> route, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
Writes writes, Result result, BiConsumer<? super Result, Throwable> callback)
+ private static boolean doInteropPersist(Node node, Topologies all,
Route<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes
writes, Result result, FullRoute<?> fullRoute, BiConsumer<? super Result,
Throwable> callback)
{
Update update = txn.update();
ConsistencyLevel consistencyLevel = update instanceof AccordUpdate ?
((AccordUpdate) update).cassandraCommitCL() : null;
if (consistencyLevel == null || consistencyLevel ==
ConsistencyLevel.ANY || writes.isEmpty())
return false;
- new AccordInteropPersist(node, all, txnId, route, txn, executeAt,
deps, writes, result, consistencyLevel, callback)
+ new AccordInteropPersist(node, all, txnId, sendTo, txn, executeAt,
deps, writes, result, fullRoute, consistencyLevel, callback)
.start(AccordInteropApply.FACTORY, Minimal, all, writes, result);
return true;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index 1cd3a05695..44f54ab51d 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -62,14 +62,14 @@ public class AccordInteropApply extends Apply implements
LocalListeners.ComplexL
public static final Apply.Factory FACTORY = new Apply.Factory()
{
@Override
- public Apply create(Kind kind, Id to, Topologies participates, TxnId
txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes
writes, Result result)
+ public Apply create(Kind kind, Id to, Topologies participates, TxnId
txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes,
Result result, FullRoute<?> fullRoute)
{
checkArgument(kind != Kind.Maximal, "Shouldn't need to send a
maximal commit with interop support");
ConsistencyLevel commitCL = txn.update() instanceof AccordUpdate ?
((AccordUpdate) txn.update()).cassandraCommitCL() : null;
// Any asynchronous apply option should use the regular Apply that
doesn't wait for writes to complete
if (commitCL == null || commitCL == ConsistencyLevel.ANY)
- return Apply.FACTORY.create(kind, to, participates, txnId,
route, txn, executeAt, deps, writes, result);
- return new AccordInteropApply(kind, to, participates, txnId,
route, txn, executeAt, deps, writes, result);
+ return Apply.FACTORY.create(kind, to, participates, txnId,
route, txn, executeAt, deps, writes, result, fullRoute);
+ return new AccordInteropApply(kind, to, participates, txnId,
route, txn, executeAt, deps, writes, result, fullRoute);
}
};
@@ -91,9 +91,9 @@ public class AccordInteropApply extends Apply implements
LocalListeners.ComplexL
super(kind, txnId, route, waitForEpoch, executeAt, deps, txn,
fullRoute, writes, result);
}
- private AccordInteropApply(Kind kind, Id to, Topologies participates,
TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps,
Writes writes, Result result)
+ private AccordInteropApply(Kind kind, Id to, Topologies participates,
TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes
writes, Result result, FullRoute<?> fullRoute)
{
- super(kind, to, participates, txnId, route, txn, executeAt, deps,
writes, result);
+ super(kind, to, participates, txnId, route, txn, executeAt, deps,
writes, result, fullRoute);
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
index a0fd42229f..342dc40140 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
@@ -30,6 +30,7 @@ import accord.local.Node;
import accord.messages.Apply;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
+import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -91,9 +92,9 @@ public class AccordInteropPersist extends Persist
private final ConsistencyLevel consistencyLevel;
private CallbackHolder callback;
- public AccordInteropPersist(Node node, Topologies topologies, TxnId txnId,
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes,
Result result, ConsistencyLevel consistencyLevel, BiConsumer<? super Result,
Throwable> clientCallback)
+ public AccordInteropPersist(Node node, Topologies topologies, TxnId txnId,
Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result
result, FullRoute<?> fullRoute, ConsistencyLevel consistencyLevel, BiConsumer<?
super Result, Throwable> clientCallback)
{
- super(node, topologies, txnId, route, txn, executeAt, deps, writes,
result);
+ super(node, topologies, txnId, sendTo, txn, executeAt, deps, writes,
result, fullRoute);
Invariants.checkArgument(consistencyLevel == ConsistencyLevel.QUORUM
|| consistencyLevel == ConsistencyLevel.ALL || consistencyLevel ==
ConsistencyLevel.SERIAL || consistencyLevel == ConsistencyLevel.ONE);
this.consistencyLevel = consistencyLevel;
registerClientCallback(result, clientCallback);
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
index 54b25a1c87..0006a068ec 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
@@ -45,7 +45,8 @@ public class AwaitSerializer
CommandSerializers.txnId.serialize(await.txnId, out, version);
KeySerializers.participants.serialize(await.scope, out, version);
out.writeByte(await.blockedUntil.ordinal());
- out.writeUnsignedVInt(await.awaitEpoch - await.txnId.epoch());
+ out.writeUnsignedVInt(await.maxAwaitEpoch - await.txnId.epoch());
+ out.writeUnsignedVInt(await.maxAwaitEpoch - await.minAwaitEpoch);
out.writeUnsignedVInt32(await.callbackId + 1);
Invariants.checkState(await.callbackId >= -1);
}
@@ -56,10 +57,11 @@ public class AwaitSerializer
TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
Participants<?> scope =
KeySerializers.participants.deserialize(in, version);
BlockedUntil blockedUntil = BlockedUntil.forOrdinal(in.readByte());
- long awaitEpoch = in.readUnsignedVInt() + txnId.epoch();
+ long maxAwaitEpoch = in.readUnsignedVInt() + txnId.epoch();
+ long minAwaitEpoch = maxAwaitEpoch - in.readUnsignedVInt();
int callbackId = in.readUnsignedVInt32() - 1;
Invariants.checkState(callbackId >= -1);
- return Await.SerializerSupport.create(txnId, scope, blockedUntil,
awaitEpoch, callbackId);
+ return Await.SerializerSupport.create(txnId, scope, blockedUntil,
minAwaitEpoch, maxAwaitEpoch, callbackId);
}
@Override
@@ -68,7 +70,8 @@ public class AwaitSerializer
return CommandSerializers.txnId.serializedSize(await.txnId,
version)
+ KeySerializers.participants.serializedSize(await.scope,
version)
+ TypeSizes.BYTE_SIZE
- + VIntCoding.computeUnsignedVIntSize(await.awaitEpoch -
await.txnId.epoch())
+ + VIntCoding.computeUnsignedVIntSize(await.maxAwaitEpoch -
await.txnId.epoch())
+ + VIntCoding.computeUnsignedVIntSize(await.maxAwaitEpoch -
await.minAwaitEpoch)
+ VIntCoding.computeUnsignedVIntSize(await.callbackId + 1);
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 0c86ee7747..947b166619 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -80,6 +80,7 @@ public class ReadDataSerializers
{
CommandSerializers.txnId.serialize(msg.txnId, out, version);
KeySerializers.participants.serialize(msg.readScope, out, version);
+ out.writeUnsignedVInt(msg.minEpoch());
CommandSerializers.timestamp.serialize(msg.executeAt, out,
version);
KeySerializers.fullRoute.serialize(msg.route, out, version);
CommandSerializers.partialTxn.serialize(msg.txn, out, version);
@@ -93,6 +94,7 @@ public class ReadDataSerializers
return ApplyThenWaitUntilApplied.SerializerSupport.create(
CommandSerializers.txnId.deserialize(in, version),
KeySerializers.participants.deserialize(in, version),
+ in.readUnsignedVInt(),
CommandSerializers.timestamp.deserialize(in, version),
KeySerializers.fullRoute.deserialize(in, version),
CommandSerializers.partialTxn.deserialize(in, version),
@@ -106,6 +108,7 @@ public class ReadDataSerializers
{
return CommandSerializers.txnId.serializedSize(msg.txnId, version)
+ KeySerializers.participants.serializedSize(msg.readScope,
version)
+ + TypeSizes.sizeofUnsignedVInt(msg.minEpoch())
+
CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
+ KeySerializers.fullRoute.serializedSize(msg.route,
version)
+ CommandSerializers.partialTxn.serializedSize(msg.txn,
version)
@@ -279,7 +282,8 @@ public class ReadDataSerializers
{
CommandSerializers.txnId.serialize(waitUntilApplied.txnId, out,
version);
KeySerializers.participants.serialize(waitUntilApplied.readScope,
out, version);
- out.writeUnsignedVInt(waitUntilApplied.executeAtEpoch);
+ out.writeUnsignedVInt(waitUntilApplied.minEpoch());
+ out.writeUnsignedVInt(waitUntilApplied.executeAtEpoch -
waitUntilApplied.minEpoch());
}
@Override
@@ -287,8 +291,9 @@ public class ReadDataSerializers
{
TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
Participants<?> readScope =
KeySerializers.participants.deserialize(in, version);
- long executeAtEpoch = in.readUnsignedVInt();
- return WaitUntilApplied.SerializerSupport.create(txnId, readScope,
executeAtEpoch);
+ long minEpoch = in.readUnsignedVInt();
+ long executeAtEpoch = minEpoch + in.readUnsignedVInt();
+ return WaitUntilApplied.SerializerSupport.create(txnId, readScope,
minEpoch, executeAtEpoch);
}
@Override
@@ -296,7 +301,8 @@ public class ReadDataSerializers
{
return
CommandSerializers.txnId.serializedSize(waitUntilApplied.txnId, version)
+
KeySerializers.participants.serializedSize(waitUntilApplied.readScope, version)
- +
TypeSizes.sizeofUnsignedVInt(waitUntilApplied.executeAtEpoch);
+ + TypeSizes.sizeofUnsignedVInt(waitUntilApplied.minEpoch())
+ +
TypeSizes.sizeofUnsignedVInt(waitUntilApplied.executeAtEpoch -
waitUntilApplied.minEpoch());
}
};
}
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 6bf6849e42..fbcfee12f3 100644
---
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -207,8 +207,8 @@ public class CompactionAccordIteratorsTest
testAccordCommandsPurger(redundantBefore(GT_TXN_ID),
durableBefore(UNIVERSAL), expectAccordCommandsErase());
// Universally durable but not locally; we're stale, but shouldn't
erase
testAccordCommandsPurger(redundantBefore(LT_TXN_ID),
durableBefore(UNIVERSAL), expectAccordCommandsNoChange());
- // With redundantBefore at the txnId there should be no change because
it is < not <=
- testAccordCommandsPurger(redundantBefore(TXN_ID),
durableBefore(MAJORITY), expectAccordCommandsNoChange());
+ // redundantBefore will be > txnId as must be exclusive sync point, so
will truncate
+ testAccordCommandsPurger(redundantBefore(TXN_ID),
durableBefore(MAJORITY), expectAccordCommandsTruncated());
testAccordCommandsPurger(redundantBefore(LT_TXN_ID),
durableBefore(MAJORITY), expectAccordCommandsNoChange());
// Durable at a majority can be truncated with minimal data preserved,
it must be redundant for this to occur
testAccordCommandsPurger(redundantBefore(GT_TXN_ID),
durableBefore(MAJORITY), expectAccordCommandsTruncated());
@@ -249,10 +249,11 @@ public class CompactionAccordIteratorsTest
testAccordCommandsForKeyPurger(null,
expectedAccordCommandsForKeyNoChange());
testAccordTimestampsForKeyPurger(redundantBefore(LT_TXN_ID),
expectedAccordTimestampsForKeyNoChange());
testAccordCommandsForKeyPurger(redundantBefore(LT_TXN_ID),
expectedAccordCommandsForKeyNoChange());
- testAccordTimestampsForKeyPurger(redundantBefore(TXN_ID),
expectedAccordTimestampsForKeyNoChange());
- testAccordCommandsForKeyPurger(redundantBefore(TXN_ID),
expectedAccordCommandsForKeyNoChange());
- testAccordTimestampsForKeyPurger(redundantBefore(GT_TXN_ID),
expectedAccordTimestampsForKeyEraseOne());
- testAccordCommandsForKeyPurger(redundantBefore(GT_TXN_ID),
expectedAccordCommandsForKeyEraseOne());
+ // will erase one more than expected as converted to
ExclusiveSyncPoint id which is > base id
+ testAccordTimestampsForKeyPurger(redundantBefore(TXN_ID),
expectedAccordTimestampsForKeyEraseOne());
+ testAccordCommandsForKeyPurger(redundantBefore(TXN_ID),
expectedAccordCommandsForKeyEraseOne());
+ testAccordTimestampsForKeyPurger(redundantBefore(GT_TXN_ID),
expectedAccordTimestampsForKeyEraseAll());
+ testAccordCommandsForKeyPurger(redundantBefore(GT_TXN_ID),
expectedAccordCommandsForKeyEraseAll());
testAccordTimestampsForKeyPurger(redundantBefore(GT_SECOND_TXN_ID),
expectedAccordTimestampsForKeyEraseAll());
testAccordCommandsForKeyPurger(redundantBefore(GT_SECOND_TXN_ID),
expectedAccordCommandsForKeyEraseAll());
}
@@ -399,7 +400,7 @@ public class CompactionAccordIteratorsTest
private static RedundantBefore redundantBefore(TxnId txnId)
{
Ranges ranges = AccordTestUtils.fullRange(AccordTestUtils.keys(table,
42));
- txnId = txnId.as(Kind.Read, Range);
+ txnId = txnId.as(Kind.ExclusiveSyncPoint, Range);
return RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE,
txnId, txnId, txnId, txnId, LT_TXN_ID.as(Range));
}
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
index bc476baab5..0207b14d20 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
@@ -133,7 +133,7 @@ public class AccordTaskTest
// TODO review: This change to `ifInitialized` was done in a lot
of places and it doesn't preserve this property
// I fixed this reference to point to `ifLoadedAndInitialised` and
but didn't update other places
Assert.assertNull(instance.ifInitialised(txnId));
-
Assert.assertNull(instance.ifLoadedAndInitialisedAndNotErased(txnId));
+ Assert.assertNull(instance.ifLoadedAndInitialised(txnId));
}));
UntypedResultSet result = AccordKeyspace.loadCommandRow(commandStore,
txnId);
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 052736200c..e863661982 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
@@ -202,6 +203,8 @@ public class CommandsForKeySerializerTest
case Erased:
case ErasedOrVestigial:
+ return Command.Truncated.erased(txnId,
Status.Durability.UniversalOrInvalidated, StoreParticipants.empty(txnId));
+
case Invalidated:
return Command.SerializerSupport.invalidated(txnId);
}
@@ -431,7 +434,8 @@ public class CommandsForKeySerializerTest
}
}
- Choices<SaveStatus> saveStatusChoices =
Choices.uniform(SaveStatus.values());
+ // TODO (expected): we currently don't explore TruncatedApply
statuses because we don't transition through all phases and therefore don't
adopt the Applied status
+ Choices<SaveStatus> saveStatusChoices =
Choices.uniform(EnumSet.complementOf(EnumSet.of(SaveStatus.TruncatedApply,
SaveStatus.TruncatedApplyWithOutcome,
SaveStatus.TruncatedApplyWithDeps)).toArray(SaveStatus[]::new));
Supplier<SaveStatus> saveStatusSupplier = () -> {
SaveStatus result = saveStatusChoices.choose(source);
while (result == SaveStatus.TruncatedApplyWithDeps) // not a
real save status
@@ -471,7 +475,8 @@ public class CommandsForKeySerializerTest
while (commands.size() > 0)
{
int next = source.nextInt(commands.size());
- cfk = cfk.update(commands.get(next)).cfk();
+ Command command = commands.get(next);
+ cfk = cfk.update(command).cfk();
commands.set(next, commands.get(commands.size() - 1));
commands.remove(commands.size() - 1);
}
@@ -486,7 +491,7 @@ public class CommandsForKeySerializerTest
}
TxnInfo info = cfk.get(i);
InternalStatus expectStatus =
InternalStatus.from(cmd.saveStatus);
- if (expectStatus == null) expectStatus =
InternalStatus.TRANSITIVELY_KNOWN;
+ if (expectStatus == null) expectStatus =
InternalStatus.TRANSITIVE;
if (expectStatus.hasExecuteAtOrDeps)
Assert.assertEquals(cmd.executeAt, info.executeAt);
Assert.assertEquals(expectStatus, info.status());
@@ -545,7 +550,7 @@ public class CommandsForKeySerializerTest
{
int idx = Arrays.binarySearch(ids, u.txnId);
if (idx < 0)
- missing.add(TxnInfo.create(u.txnId,
InternalStatus.TRANSITIVELY_KNOWN, true, u.txnId, Ballot.ZERO));
+ missing.add(TxnInfo.create(u.txnId,
InternalStatus.TRANSITIVE, true, u.txnId, Ballot.ZERO));
}
if (!missing.isEmpty())
{
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 99097c064c..e4e08ac180 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -443,7 +443,7 @@ public class AccordGenerators
public static Gen<RedundantBefore.Entry> redundantBeforeEntry(IPartitioner
partitioner)
{
- return redundantBeforeEntry(Gens.bools().all(), range(partitioner),
AccordGens.txnIds(Gens.pick(Txn.Kind.SyncPoint, Txn.Kind.ExclusiveSyncPoint),
ignore -> Routable.Domain.Range));
+ return redundantBeforeEntry(Gens.bools().all(), range(partitioner),
AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore ->
Routable.Domain.Range));
}
public static Gen<RedundantBefore.Entry> redundantBeforeEntry(Gen<Boolean>
emptyGen, Gen<Range> rangeGen, Gen<TxnId> txnIdGen)
@@ -451,11 +451,11 @@ public class AccordGenerators
return rs -> {
Range range = rangeGen.next(rs);
TxnId locallyWitnessedOrInvalidatedBefore = emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
- TxnId locallyAppliedOrInvalidatedBefore = emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
- TxnId locallyDecidedAndAppliedOrInvalidatedBefore =
locallyAppliedOrInvalidatedBefore;
- TxnId shardAppliedOrInvalidatedBefore = emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
- TxnId shardOnlyAppliedOrInvalidatedBefore =
shardAppliedOrInvalidatedBefore;
- TxnId gcBefore = shardAppliedOrInvalidatedBefore;
+ TxnId locallyAppliedOrInvalidatedBefore =
TxnId.nonNullOrMin(locallyWitnessedOrInvalidatedBefore, emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
+ TxnId locallyDecidedAndAppliedOrInvalidatedBefore =
TxnId.nonNullOrMin(locallyAppliedOrInvalidatedBefore, emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
+ TxnId shardOnlyAppliedOrInvalidatedBefore = emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
+ TxnId shardAppliedOrInvalidatedBefore =
TxnId.nonNullOrMin(locallyAppliedOrInvalidatedBefore,
TxnId.nonNullOrMin(shardOnlyAppliedOrInvalidatedBefore, emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs))); // emptyable or range
+ TxnId gcBefore =
TxnId.nonNullOrMin(shardAppliedOrInvalidatedBefore, emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
TxnId bootstrappedAt = txnIdGen.next(rs);
Timestamp staleUntilAtLeast = emptyGen.next(rs) ? null :
txnIdGen.next(rs); // nullable
@@ -469,7 +469,7 @@ public class AccordGenerators
public static Gen<RedundantBefore> redundantBefore(IPartitioner
partitioner)
{
Gen<Ranges> rangeGen = rangesArbitrary(partitioner);
- Gen<TxnId> txnIdGen = AccordGens.txnIds(Gens.pick(Txn.Kind.SyncPoint,
Txn.Kind.ExclusiveSyncPoint), ignore -> Routable.Domain.Range);
+ Gen<TxnId> txnIdGen =
AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore ->
Routable.Domain.Range);
BiFunction<RandomSource, Range, RedundantBefore.Entry> entryGen = (rs,
range) -> redundantBeforeEntry(Gens.bools().all(), i -> range,
txnIdGen).next(rs);
return AccordGens.redundantBefore(rangeGen, entryGen);
}
@@ -477,7 +477,7 @@ public class AccordGenerators
public static Gen<DurableBefore> durableBeforeGen(IPartitioner partitioner)
{
Gen<Ranges> rangeGen = rangesArbitrary(partitioner);
- Gen<TxnId> txnIdGen = AccordGens.txnIds(Gens.pick(Txn.Kind.SyncPoint,
Txn.Kind.ExclusiveSyncPoint), ignore -> Routable.Domain.Range);
+ Gen<TxnId> txnIdGen =
AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore ->
Routable.Domain.Range);
return (rs) -> {
Ranges ranges = rangeGen.next(rs);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]