This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 33918934c7 (Accord) Fix recovery when one shard is committed/stable
and another is preaccepted Also fix: - Topology slicing must declare whether
we share/slice node ownership (to assist above) - CFK.visit removes transitive
dependencies too eagerly across epoch change - apply cleanup to builder
consistently, and construct the same value we would produce by purge (so that
replay is idempotent) - Invoke ExecuteTxn.LocalExecute callbacks on
originating CommandStore - misc other [...]
33918934c7 is described below
commit 33918934c702d4d3e1d846b236aacf6b9c795e8c
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Feb 13 00:36:59 2025 +0000
(Accord) Fix recovery when one shard is committed/stable and another is
preaccepted
Also fix:
- Topology slicing must declare whether we share/slice node ownership (to
assist above)
- CFK.visit removes transitive dependencies too eagerly across epoch change
- apply cleanup to builder consistently, and construct the same value we
would produce by purge (so that replay is idempotent)
- Invoke ExecuteTxn.LocalExecute callbacks on originating CommandStore
- misc other minor issues
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20325
---
modules/accord | 2 +-
.../db/compaction/CompactionIterator.java | 41 +++---
src/java/org/apache/cassandra/dht/Token.java | 2 +-
.../cassandra/index/accord/RouteJournalIndex.java | 4 +-
.../service/accord/AccordCommandStore.java | 2 +-
.../cassandra/service/accord/AccordJournal.java | 15 +--
.../service/accord/AccordJournalTable.java | 2 +-
.../service/accord/AccordMessageSink.java | 29 ++--
.../cassandra/service/accord/AccordService.java | 5 +-
.../service/accord/RouteInMemoryIndex.java | 6 +-
.../cassandra/service/accord/api/AccordAgent.java | 54 +++++++-
.../service/accord/api/AccordRoutingKey.java | 3 +-
.../accord/interop/AccordInteropAdapter.java | 29 ++--
.../service/accord/interop/AccordInteropApply.java | 7 +-
.../accord/interop/AccordInteropExecution.java | 8 +-
.../accord/interop/AccordInteropPersist.java | 2 +-
.../accord/repair/RepairSyncPointAdapter.java | 4 +-
.../accord/serializers/AcceptSerializers.java | 120 +++++++----------
.../serializers/BeginInvalidationSerializers.java | 6 +-
.../accord/serializers/CommandSerializers.java | 70 +++++++++-
.../service/accord/serializers/KeySerializers.java | 24 ++--
.../accord/serializers/LatestDepsSerializers.java | 5 +-
.../accord/serializers/RecoverySerializers.java | 6 +-
.../cassandra/service/accord/txn/TxnNamedRead.java | 28 ++++
.../cassandra/service/accord/txn/TxnRead.java | 146 +++++++++++++++++----
.../cassandra/service/accord/txn/TxnWrite.java | 3 -
.../accord/AccordMigrationWriteRaceTestBase.java | 4 +-
.../distributed/test/accord/AccordTestBase.java | 4 +-
.../cassandra/simulator/paxos/PaxosSimulation.java | 1 -
test/unit/org/apache/cassandra/Util.java | 3 +-
.../service/accord/AccordCommandTest.java | 2 +-
.../service/accord/AccordJournalOrderTest.java | 6 +-
.../service/accord/AccordMessageSinkTest.java | 4 +-
.../accord/SimulatedAccordCommandStore.java | 2 +-
.../serializers/CommandsForKeySerializerTest.java | 3 +-
35 files changed, 435 insertions(+), 217 deletions(-)
diff --git a/modules/accord b/modules/accord
index 74a3b81ca9..1542da5d8a 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 74a3b81ca9d9e1ce7ddfd117682fc7c310f0cd99
+Subproject commit 1542da5d8acf08b8226227c91d4f81c64ed9762c
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 198f1941d2..dbee4e8893 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -40,6 +40,7 @@ import accord.local.CommandStores;
import accord.local.DurableBefore;
import accord.local.RedundantBefore;
import accord.utils.Invariants;
+import accord.utils.UnhandledEnum;
import org.agrona.collections.Int2ObjectHashMap;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -98,7 +99,9 @@ import
org.apache.cassandra.service.paxos.uncommitted.PaxosRows;
import org.apache.cassandra.utils.TimeUUID;
import static accord.local.Cleanup.ERASE;
+import static accord.local.Cleanup.EXPUNGE;
import static accord.local.Cleanup.Input.PARTIAL;
+import static accord.local.Cleanup.NO;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.cassandra.config.Config.PaxosStatePurging.legacy;
@@ -912,22 +915,30 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
RedundantBefore redundantBefore =
redundantBefores.get(key.commandStoreId);
DurableBefore durableBefore =
durableBefores.get(key.commandStoreId);
Cleanup cleanup = commandBuilder.shouldCleanup(PARTIAL, agent,
redundantBefore, durableBefore);
- if (cleanup == ERASE)
- return PartitionUpdate.fullPartitionDelete(metadata(),
partition.partitionKey(), Long.MAX_VALUE, nowInSec).unfilteredIterator();
-
- commandBuilder = commandBuilder.maybeCleanup(cleanup);
- if (commandBuilder != builder)
+ if (cleanup != NO)
{
- if (commandBuilder == null)
- return null;
-
- PartitionUpdate.SimpleBuilder newVersion =
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
-
- Row.SimpleBuilder rowBuilder =
newVersion.row(firstClustering);
- rowBuilder.add("record",
commandBuilder.asByteBuffer(redundantBefore, userVersion))
- .add("user_version", userVersion);
-
- return newVersion.build().unfilteredIterator();
+ switch (cleanup)
+ {
+ default: throw new UnhandledEnum(cleanup);
+ case EXPUNGE:
+ return null;
+ case ERASE:
+ return
PartitionUpdate.fullPartitionDelete(metadata(), partition.partitionKey(),
Long.MAX_VALUE, nowInSec).unfilteredIterator();
+ case TRUNCATE:
+ case INVALIDATE:
+ case TRUNCATE_WITH_OUTCOME:
+ case VESTIGIAL:
+ if (commandBuilder.maybeCleanup(PARTIAL, cleanup))
+ {
+ PartitionUpdate.SimpleBuilder newVersion =
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
+
+ Row.SimpleBuilder rowBuilder =
newVersion.row(firstClustering);
+ rowBuilder.add("record",
commandBuilder.asByteBuffer(redundantBefore, userVersion))
+ .add("user_version", userVersion);
+
+ return newVersion.build().unfilteredIterator();
+ }
+ }
}
return PartitionUpdate.multiRowUpdate(AccordKeyspace.Journal,
partition.partitionKey(), rows)
diff --git a/src/java/org/apache/cassandra/dht/Token.java
b/src/java/org/apache/cassandra/dht/Token.java
index 048d1a0394..f76300e80d 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -187,7 +187,7 @@ public abstract class Token implements RingPosition<Token>,
Serializable
}
}
- public static volatile boolean logPartitioner = false;
+ public static boolean logPartitioner = false;
public static final Set<Class<? extends IPartitioner>>
serializePartitioners = Sets.newSetFromMap(new ConcurrentHashMap<>());
public static final Set<Class<? extends IPartitioner>>
deserializePartitioners = Sets.newSetFromMap(new ConcurrentHashMap<>());
diff --git a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
index c03f840533..d668f3f658 100644
--- a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
+++ b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
@@ -90,6 +90,8 @@ import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import static accord.primitives.Routable.Domain.Range;
+
public class RouteJournalIndex implements Index, INotificationConsumer
{
public enum RegisterStatus
@@ -131,7 +133,7 @@ public class RouteJournalIndex implements Index,
INotificationConsumer
public static boolean allowed(TxnId id)
{
- return id.domain().isRange();
+ return id.is(Range);
}
private static void validateTargets(ColumnFamilyStore baseCfs,
IndexMetadata indexMetadata)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 8086cecdbc..4c723fa104 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -308,7 +308,7 @@ public class AccordCommandStore extends CommandStore
}
@Override
- public <T> AsyncChain<T> submit(Callable<T> task)
+ public <T> AsyncChain<T> build(Callable<T> task)
{
return AsyncChains.ofCallable(taskExecutor(), task);
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 9a266788e2..4614ba24ba 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -544,7 +544,6 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
continue;
}
-
switch (field)
{
case EXECUTE_AT:
@@ -557,7 +556,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
out.writeUnsignedVInt(command.waitingOn().minUniqueHlc());
break;
case SAVE_STATUS:
- out.writeShort(command.saveStatus().ordinal());
+ out.writeByte(command.saveStatus().ordinal());
break;
case DURABILITY:
out.writeByte(command.durability().ordinal());
@@ -631,12 +630,6 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
}
}
- public Builder maybeCleanup(Cleanup cleanup)
- {
- super.maybeCleanup(cleanup);
- return this;
- }
-
public void serialize(DataOutputPlus out, RedundantBefore
redundantBefore, int userVersion) throws IOException
{
Invariants.require(mask == 0);
@@ -697,7 +690,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
minUniqueHlc = in.readUnsignedVInt();
break;
case SAVE_STATUS:
- saveStatus = SaveStatus.values()[in.readShort()];
+ saveStatus = SaveStatus.values()[in.readByte()];
break;
case DURABILITY:
durability = Durability.values()[in.readByte()];
@@ -748,14 +741,14 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
in.readUnsignedVInt();
break;
case SAVE_STATUS:
- in.readShort();
+ in.readByte();
break;
case DURABILITY:
in.readByte();
break;
case ACCEPTED:
case PROMISED:
- CommandSerializers.ballot.skip(in, userVersion);
+ CommandSerializers.ballot.skip(in);
break;
case PARTICIPANTS:
// TODO (expected): skip
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index 5b16752e32..8eb144c4f5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -231,7 +231,7 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
@Override
public void accept(long segment, int position, K key, ByteBuffer
buffer, int userVersion)
{
- if (!tableRecordConsumer.visited(segment)) //TODO: don't need this
anymore
+ if (!tableRecordConsumer.visited(segment)) //TODO (required):
don't need this anymore
delegate.accept(segment, position, key, buffer, userVersion);
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index cf8f8c7515..bdb9f03584 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Agent;
import accord.api.MessageSink;
import accord.impl.RequestCallbacks;
import accord.local.AgentExecutor;
@@ -46,19 +45,16 @@ import accord.messages.ReplyContext;
import accord.messages.Request;
import accord.messages.TxnRequest;
import accord.primitives.TxnId;
-import org.apache.cassandra.config.AccordSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ResponseContext;
import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.service.TimeoutStrategy;
-import org.apache.cassandra.service.TimeoutStrategy.LatencySourceFactory;
+import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.utils.Clock;
import static accord.messages.MessageType.Kind.REMOTE;
@@ -211,29 +207,20 @@ public class AccordMessageSink implements MessageSink
return null;
}
- private final Agent agent;
+ private final AccordAgent agent;
private final MessageDelivery messaging;
private final AccordEndpointMapper endpointMapper;
private final RequestCallbacks callbacks;
- // TODO (required): make hot property
- private TimeoutStrategy slowPreaccept, slowRead;
- public AccordMessageSink(Agent agent, MessageDelivery messaging,
AccordEndpointMapper endpointMapper, RequestCallbacks callbacks)
+ public AccordMessageSink(AccordAgent agent, MessageDelivery messaging,
AccordEndpointMapper endpointMapper, RequestCallbacks callbacks)
{
- AccordSpec config = DatabaseDescriptor.getAccord();
- if (config != null)
- {
- // TODO (expected): introduce better metrics, esp. for preaccept,
but also to disambiguate DC latencies
- slowPreaccept = new TimeoutStrategy(config.slowPreAccept,
LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
- slowRead = new TimeoutStrategy(config.slowRead,
LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
- }
this.agent = agent;
this.messaging = messaging;
this.endpointMapper = endpointMapper;
this.callbacks = callbacks;
}
- public AccordMessageSink(Agent agent, AccordConfigurationService
endpointMapper, RequestCallbacks callbacks)
+ public AccordMessageSink(AccordAgent agent, AccordConfigurationService
endpointMapper, RequestCallbacks callbacks)
{
this(agent, MessagingService.instance(), endpointMapper, callbacks);
}
@@ -285,17 +272,17 @@ public class AccordMessageSink implements MessageSink
{
case ACCORD_READ_REQ:
case ACCORD_STABLE_THEN_READ_REQ:
- if (slowRead == null || isRangeBarrier(request))
+ if (agent.slowRead() == null || isRangeBarrier(request))
break;
case ACCORD_CHECK_STATUS_REQ:
- delayedAtNanos = nowNanos + slowRead.computeWait(1,
NANOSECONDS);
+ delayedAtNanos = nowNanos + agent.slowRead().computeWait(1,
NANOSECONDS);
break;
case ACCORD_PRE_ACCEPT_REQ:
- if (slowPreaccept == null || isRangeBarrier(request))
+ if (agent.slowPreaccept() == null || isRangeBarrier(request))
break;
- delayedAtNanos = nowNanos + slowPreaccept.computeWait(1,
NANOSECONDS);
+ delayedAtNanos = nowNanos +
agent.slowPreaccept().computeWait(1, NANOSECONDS);
}
Message<Request> message = Message.out(verb, request, expiresAtNanos);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 81f992eec1..931a8ecead 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -176,6 +176,7 @@ import static accord.messages.SimpleReply.Ok;
import static accord.primitives.Routable.Domain.Key;
import static accord.primitives.Routable.Domain.Range;
import static accord.primitives.TxnId.Cardinality.cardinality;
+import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
import static accord.utils.Invariants.require;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -957,8 +958,6 @@ public class AccordService implements IAccordService,
Shutdownable
}
else if (cause instanceof RuntimeException)
throw (RuntimeException) cause;
- else if (cause instanceof InvalidRequestException)
- throw ((InvalidRequestException)cause);
else
throw new RuntimeException(cause);
}
@@ -1486,7 +1485,7 @@ public class AccordService implements IAccordService,
Shutdownable
private Await(Node node, SyncPoint<?> exclusiveSyncPoint)
{
- Topologies topologies =
node.topology().forEpoch(exclusiveSyncPoint.route,
exclusiveSyncPoint.syncId.epoch());
+ Topologies topologies =
node.topology().forEpoch(exclusiveSyncPoint.route,
exclusiveSyncPoint.syncId.epoch(), SHARE);
this.node = node;
this.tracker = new AllTracker(topologies);
this.exclusiveSyncPoint = exclusiveSyncPoint;
diff --git
a/src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java
b/src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java
index 722d7d6f0c..e0861d661e 100644
--- a/src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java
+++ b/src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java
@@ -26,7 +26,6 @@ import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.function.Consumer;
-import accord.primitives.Routable;
import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -45,6 +44,8 @@ import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.RTree;
import org.apache.cassandra.utils.RangeTree;
+import static accord.primitives.Routable.Domain.Range;
+
public class RouteInMemoryIndex<V> implements RangeSearcher
{
private final Long2ObjectHashMap<SegmentIndex> segmentIndexes = new
Long2ObjectHashMap<>();
@@ -144,8 +145,7 @@ public class RouteInMemoryIndex<V> implements RangeSearcher
private void add(TxnId id, Unseekable keyOrRange)
{
- if (keyOrRange.domain() != Routable.Domain.Range)
- throw new IllegalArgumentException("Unexpected domain: " +
keyOrRange.domain());
+ Invariants.require(keyOrRange.domain() == Range);
TokenRange ts = (TokenRange) keyOrRange;
TableId tableId = ts.table();
tableIndex.computeIfAbsent(tableId, i -> new TableIndex()).add(id,
ts);
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 466914eae6..b640f47c37 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -51,9 +51,13 @@ import accord.utils.DefaultRandom;
import accord.utils.Invariants;
import accord.utils.RandomSource;
import accord.utils.SortedList;
+import accord.utils.UnhandledEnum;
+import org.apache.cassandra.config.AccordSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.AccordMetrics;
+import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
import org.apache.cassandra.net.ResponseContext;
+import org.apache.cassandra.service.TimeoutStrategy;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.txn.TxnQuery;
import org.apache.cassandra.service.accord.txn.TxnRead;
@@ -73,12 +77,18 @@ public class AccordAgent implements Agent
{
private static final Logger logger =
LoggerFactory.getLogger(AccordAgent.class);
- protected Node.Id self;
-
// TODO (required): this should be configurable and have exponential
back-off, escaping to operator input past a certain number of retries
private long retryBootstrapDelayMicros = SECONDS.toMicros(1L);
private final RandomSource random = new DefaultRandom();
+ // TODO (required): make hot property
+ private TimeoutStrategy slowPreaccept, slowRead;
+ protected Node.Id self;
+
+ public AccordAgent()
+ {
+ }
+
public void setNodeId(Node.Id id)
{
self = id;
@@ -247,11 +257,23 @@ public class AccordAgent implements Agent
return units.convert((1L << Math.min(retryCount, 4)), SECONDS);
}
+ @Override
+ public long localSlowAt(TxnId txnId, Status.Phase phase, TimeUnit unit)
+ {
+ switch (phase)
+ {
+ default: throw new UnhandledEnum(phase);
+ case PreAccept: return
unit.convert(slowPreaccept().computeWaitUntil(1), NANOSECONDS);
+ case Execute: return
unit.convert(slowRead().computeWaitUntil(1), NANOSECONDS);
+ }
+ }
+
@Override
public long localExpiresAt(TxnId txnId, Status.Phase phase, TimeUnit unit)
{
// TODO (expected): make this configurable
- return txnId.is(Write) ? DatabaseDescriptor.getWriteRpcTimeout(unit) :
DatabaseDescriptor.getReadRpcTimeout(unit);
+ return txnId.is(Write) ? DatabaseDescriptor.getWriteRpcTimeout(unit)
+ : DatabaseDescriptor.getReadRpcTimeout(unit);
}
@Override
@@ -265,4 +287,30 @@ public class AccordAgent implements Agent
{
logger.error(message);
}
+
+ public TimeoutStrategy slowRead()
+ {
+ if (slowRead == null)
+ {
+ synchronized (this)
+ {
+ AccordSpec config = DatabaseDescriptor.getAccord();
+ slowRead = new TimeoutStrategy(config.slowRead,
TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
+ }
+ }
+ return slowRead;
+ }
+
+ public TimeoutStrategy slowPreaccept()
+ {
+ if (slowPreaccept == null)
+ {
+ synchronized (this)
+ {
+ AccordSpec config = DatabaseDescriptor.getAccord();
+ slowPreaccept = new TimeoutStrategy(config.slowPreAccept,
TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
+ }
+ }
+ return slowPreaccept;
+ }
}
diff --git
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 61454eab64..f3db1790f3 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -33,6 +33,7 @@ import accord.local.ShardDistributor;
import accord.primitives.Range;
import accord.primitives.RangeFactory;
import accord.primitives.Ranges;
+import accord.utils.Invariants;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.dht.IPartitioner;
@@ -257,6 +258,7 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
public void serialize(T key, DataOutputPlus out, int version) throws
IOException
{
key.table.serializeCompact(out);
+ Invariants.require(key.token.getPartitioner() == getPartitioner());
Token.compactSerializer.serialize(key.token, out, version);
}
@@ -264,7 +266,6 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
public void skip(DataInputPlus in, int version) throws IOException
{
TableId.skipCompact(in);
- // TODO (expected): should we be using the TableId partitioner
here?
Token.compactSerializer.skip(in, getPartitioner(), version);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
index fb3d8a606c..435c02f26c 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
@@ -26,9 +26,8 @@ import org.slf4j.LoggerFactory;
import accord.api.Result;
import accord.api.Update;
import accord.coordinate.CoordinationAdapter;
-import accord.coordinate.CoordinationAdapter.Adapters.AbstractTxnAdapter;
+import accord.coordinate.CoordinationAdapter.Adapters.TxnAdapter;
import accord.coordinate.ExecutePath;
-import accord.coordinate.PersistTxn;
import accord.local.Node;
import accord.messages.Apply;
import accord.primitives.Deps;
@@ -39,6 +38,7 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.topology.Topologies;
+import accord.topology.Topologies.SelectNodeOwnership;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.service.accord.AccordEndpointMapper;
import org.apache.cassandra.service.accord.api.AccordAgent;
@@ -49,7 +49,7 @@ import org.apache.cassandra.service.accord.txn.TxnRead;
import static accord.messages.Apply.Kind.Maximal;
import static accord.messages.Apply.Kind.Minimal;
-public class AccordInteropAdapter extends AbstractTxnAdapter
+public class AccordInteropAdapter extends TxnAdapter
{
private static final Logger logger =
LoggerFactory.getLogger(AccordInteropAdapter.class);
public static final class AccordInteropFactory implements
CoordinationAdapter.Factory
@@ -76,29 +76,29 @@ public class AccordInteropAdapter extends AbstractTxnAdapter
private AccordInteropAdapter(InteropExecutor executor,
AccordEndpointMapper endpointMapper, Apply.Kind applyKind)
{
+ super(Minimal);
this.executor = executor;
this.endpointMapper = endpointMapper;
this.applyKind = applyKind;
}
@Override
- public void execute(Node node, Topologies all, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
BiConsumer<? super Result, Throwable> callback)
+ public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps,
Deps sendDeps, BiConsumer<? super Result, Throwable> callback)
{
- if (!doInteropExecute(node, route, txnId, txn, executeAt, deps,
callback))
- super.execute(node, all, route, path, txnId, txn, executeAt, deps,
callback);
+ if (!doInteropExecute(node, route, txnId, txn, executeAt, stableDeps,
callback))
+ super.execute(node, any, route, path, txnId, txn, executeAt,
stableDeps, sendDeps, callback);
}
@Override
- public void persist(Node node, Topologies all, FullRoute<?> fullRoute,
Route<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes
writes, Result result, BiConsumer<? super Result, Throwable> callback)
+ public void persist(Node node, Topologies any, Route<?> require, Route<?>
sendTo, SelectNodeOwnership selectSendTo, FullRoute<?> route, TxnId txnId, Txn
txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<?
super Result, Throwable> callback)
{
- if (applyKind == Minimal && doInteropPersist(node, all, sendTo, txnId,
txn, executeAt, deps, writes, result, fullRoute, callback))
+ if (applyKind == Minimal && doInteropPersist(node, any, require,
sendTo, selectSendTo, txnId, txn, executeAt, deps, writes, result, route,
callback))
return;
- if (callback != null) callback.accept(result, null);
- new PersistTxn(node, all, txnId, sendTo, txn, executeAt, deps, writes,
result, fullRoute)
- .start(Apply.FACTORY, applyKind, all, writes, result);
+ super.persist(node, any, require, sendTo, selectSendTo, route, txnId,
txn, executeAt, deps, writes, result, callback);
}
+
private boolean doInteropExecute(Node node, FullRoute<?> route, TxnId
txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super Result,
Throwable> callback)
{
// Unrecoverable repair always needs to be run by
AccordInteropExecution
@@ -112,15 +112,16 @@ public class AccordInteropAdapter extends
AbstractTxnAdapter
return true;
}
- private static boolean doInteropPersist(Node node, Topologies all,
Route<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes
writes, Result result, FullRoute<?> fullRoute, BiConsumer<? super Result,
Throwable> callback)
+ private boolean doInteropPersist(Node node, Topologies any, Route<?>
require, Route<?> sendTo, SelectNodeOwnership selectSendTo, TxnId txnId, Txn
txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?>
fullRoute, BiConsumer<? super Result, Throwable> callback)
{
Update update = txn.update();
ConsistencyLevel consistencyLevel = update instanceof AccordUpdate ?
((AccordUpdate) update).cassandraCommitCL() : null;
if (consistencyLevel == null || consistencyLevel ==
ConsistencyLevel.ANY || writes.isEmpty())
return false;
- new AccordInteropPersist(node, all, txnId, sendTo, txn, executeAt,
deps, writes, result, fullRoute, consistencyLevel, callback)
- .start(AccordInteropApply.FACTORY, Minimal, all, writes, result);
+ Topologies all = execution(node, any, sendTo, selectSendTo, fullRoute,
txnId, executeAt);
+ new AccordInteropPersist(node, all, txnId, require, txn, executeAt,
deps, writes, result, fullRoute, consistencyLevel, callback)
+ .start(Minimal, any, writes, result);
return true;
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index edc5fb9629..241c8acd01 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -48,9 +48,11 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.accord.AccordMessageSink.AccordMessageType;
import
org.apache.cassandra.service.accord.serializers.ApplySerializers.ApplySerializer;
import org.apache.cassandra.service.accord.txn.AccordUpdate;
+import org.apache.cassandra.tcm.ClusterMetadata;
import static accord.utils.Invariants.requireArgument;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
/**
* Apply that waits until the transaction is actually applied before sending a
response
@@ -100,6 +102,8 @@ public class AccordInteropApply extends Apply implements
LocalListeners.ComplexL
@Override
public ApplyReply apply(SafeCommandStore safeStore, StoreParticipants
participants)
{
+ ClusterMetadata cm = ClusterMetadata.current();
+ checkState(cm.epoch.getEpoch() >= minEpoch, "TCM epoch %d is <
minEpoch %d", cm.epoch.getEpoch(), minEpoch);
ApplyReply reply = super.apply(safeStore, participants);
requireArgument(reply == ApplyReply.Redundant || reply ==
ApplyReply.Applied || reply == ApplyReply.Insufficient, "Unexpected
ApplyReply");
@@ -198,7 +202,8 @@ public class AccordInteropApply extends Apply implements
LocalListeners.ComplexL
listeners = this.listeners;
this.listeners = null;
}
- listeners.forEach((i, v) -> v.cancel());
+ if (listeners != null)
+ listeners.forEach((i, v) -> v.cancel());
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
index 0bc6cdbe32..56966df9d3 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
@@ -95,6 +95,7 @@ import org.apache.cassandra.transport.Dispatcher;
import static accord.coordinate.CoordinationAdapter.Factory.Kind.Standard;
import static accord.primitives.Txn.Kind.Write;
+import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
import static accord.utils.Invariants.requireArgument;
import static
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadMetrics;
import static
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordWriteMetrics;
@@ -130,7 +131,7 @@ public class AccordInteropExecution implements
ReadCoordinator, MaximalCommitSen
}
@Override
- public <T> AsyncChain<T> submit(Callable<T> task)
+ public <T> AsyncChain<T> build(Callable<T> task)
{
try
{
@@ -181,9 +182,10 @@ public class AccordInteropExecution implements
ReadCoordinator, MaximalCommitSen
this.consistencyLevel = consistencyLevel;
this.endpointMapper = endpointMapper;
- this.executes = node.topology().forEpoch(route, executeAt.epoch());
+ // TODO (required): compare this to latest logic in Accord, make sure
it makes sense
+ this.executes = node.topology().forEpoch(route, executeAt.epoch(),
SHARE);
this.allTopologies = txnId.epoch() != executeAt.epoch()
- ? node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch())
+ ? node.topology().preciseEpochs(route,
txnId.epoch(), executeAt.epoch(), SHARE)
: executes;
this.executeTopology = executes.getEpoch(executeAt.epoch());
this.coordinateTopology = allTopologies.getEpoch(txnId.epoch());
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
index 1a5d74b28d..e61d3934b5 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
@@ -110,7 +110,7 @@ public class AccordInteropPersist extends Persist
public AccordInteropPersist(Node node, Topologies topologies, TxnId txnId,
Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result
result, FullRoute<?> fullRoute, ConsistencyLevel consistencyLevel, BiConsumer<?
super Result, Throwable> clientCallback)
{
- super(node, topologies, txnId, sendTo, txn, executeAt, deps, writes,
result, fullRoute);
+ super(node, topologies, txnId, sendTo, txn, executeAt, deps, writes,
result, fullRoute, AccordInteropApply.FACTORY);
Invariants.requireArgument(consistencyLevel == ConsistencyLevel.QUORUM
|| consistencyLevel == ConsistencyLevel.ALL || consistencyLevel ==
ConsistencyLevel.SERIAL || consistencyLevel == ConsistencyLevel.ONE);
this.consistencyLevel = consistencyLevel;
registerClientCallback(result, clientCallback);
diff --git
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
index c0b1938fd3..c31971883e 100644
---
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
+++
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
@@ -58,10 +58,10 @@ public class RepairSyncPointAdapter<U extends Unseekable>
extends CoordinationAd
}
@Override
- public void execute(Node node, Topologies all, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
BiConsumer<? super SyncPoint<U>, Throwable> callback)
+ public void execute(Node node, Topologies all, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps,
Deps sendDeps, BiConsumer<? super SyncPoint<U>, Throwable> callback)
{
RequiredResponseTracker tracker = new
RequiredResponseTracker(requiredResponses, all);
- ExecuteSyncPoint.ExecuteInclusive<U> execute = new
ExecuteSyncPoint.ExecuteInclusive<>(node, new SyncPoint<>(txnId, executeAt,
deps, (FullRoute<U>) route), tracker, executeAt);
+ ExecuteSyncPoint.ExecuteInclusive<U> execute = new
ExecuteSyncPoint.ExecuteInclusive<>(node, new SyncPoint<>(txnId, executeAt,
stableDeps, (FullRoute<U>) route), tracker, executeAt);
execute.addCallback(callback);
execute.start();
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index 9454cae12c..aa325da041 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -20,13 +20,15 @@ package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
+import accord.local.Commands.AcceptOutcome;
import accord.messages.Accept;
import accord.messages.Accept.AcceptReply;
import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Participants;
import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
-import accord.utils.Invariants;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -34,20 +36,21 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
import static accord.messages.Accept.SerializerSupport.create;
-import static accord.utils.Invariants.illegalState;
public class AcceptSerializers
{
private AcceptSerializers() {}
- public static final IVersionedSerializer<Accept> request = new
TxnRequestSerializer.WithUnsyncedSerializer<>()
+ public static final IVersionedSerializer<Accept> request = new
RequestSerializer();
+ public static class RequestSerializer extends
TxnRequestSerializer.WithUnsyncedSerializer<Accept>
{
- final IVersionedSerializer<Accept.Kind> kindSerializer = new
EnumSerializer<>(Accept.Kind.class);
+ private static final Accept.Kind[] kinds = Accept.Kind.values();
+ private static final int IS_PARTIAL = 1;
@Override
public void serializeBody(Accept accept, DataOutputPlus out, int
version) throws IOException
{
- kindSerializer.serialize(accept.kind, out, version);
+ out.writeByte((accept.kind.ordinal() << 1) |
(accept.isPartialAccept ? IS_PARTIAL : 0));
CommandSerializers.ballot.serialize(accept.ballot, out, version);
ExecuteAtSerializer.serialize(accept.txnId, accept.executeAt, out);
DepsSerializers.partialDeps.serialize(accept.partialDeps, out,
version);
@@ -56,17 +59,20 @@ public class AcceptSerializers
@Override
public Accept deserializeBody(DataInputPlus in, int version, TxnId
txnId, Route<?> scope, long waitForEpoch, long minEpoch) throws IOException
{
+ int flags = in.readByte();
+ Accept.Kind kind = kinds[(flags >>> 1) & 1];
return create(txnId, scope, waitForEpoch, minEpoch,
- kindSerializer.deserialize(in, version),
+ kind,
CommandSerializers.ballot.deserialize(in, version),
ExecuteAtSerializer.deserialize(txnId, in),
- DepsSerializers.partialDeps.deserialize(in,
version));
+ DepsSerializers.partialDeps.deserialize(in, version),
+ (flags & IS_PARTIAL) != 0);
}
@Override
public long serializedBodySize(Accept accept, int version)
{
- return kindSerializer.serializedSize(accept.kind, version)
+ return 1
+ CommandSerializers.ballot.serializedSize(accept.ballot,
version)
+ ExecuteAtSerializer.serializedSize(accept.txnId,
accept.executeAt)
+
DepsSerializers.partialDeps.serializedSize(accept.partialDeps, version);
@@ -103,84 +109,58 @@ public class AcceptSerializers
}
};
- public static final IVersionedSerializer<AcceptReply> reply = new
IVersionedSerializer<>()
+ public static final IVersionedSerializer<AcceptReply> reply = new
ReplySerializer();
+ public static class ReplySerializer implements
IVersionedSerializer<AcceptReply>
{
+ private static final int SUPERSEDED_BY = 0x08;
+ private static final int COMMITTED_EXECUTE_AT = 0x10;
+ private static final int SUCCESSFUL = 0x20;
+ private static final int DEPS = 0x40;
@Override
public void serialize(AcceptReply reply, DataOutputPlus out, int
version) throws IOException
{
- switch (reply.outcome())
- {
- default: throw new AssertionError();
- case Retired:
- case Truncated:
- throw illegalState("AcceptReply with invalid
AcceptOutcome: " + reply.outcome);
- case Success:
- if (reply.deps != null)
- {
- out.writeByte(1);
- DepsSerializers.deps.serialize(reply.deps, out,
version);
- }
- else
- {
- Invariants.require(reply == AcceptReply.SUCCESS);
- out.writeByte(2);
- }
- break;
- case RejectedBallot:
- out.writeByte(3);
- CommandSerializers.ballot.serialize(reply.supersededBy,
out, version);
- break;
- case Redundant:
- int flags = 4 | (reply.supersededBy != null ? 0x8 : 0) |
(reply.committedExecuteAt != null ? 0x10 : 0);
- out.writeByte(flags);
- if (reply.supersededBy != null)
-
CommandSerializers.ballot.serialize(reply.supersededBy, out, version);
- if (reply.committedExecuteAt != null)
-
ExecuteAtSerializer.serialize(reply.committedExecuteAt, out);
- }
+ int flags = reply.outcome.ordinal()
+ | (reply.supersededBy != null ? SUPERSEDED_BY
: 0)
+ | (reply.committedExecuteAt != null ?
COMMITTED_EXECUTE_AT : 0)
+ | (reply.successful != null ? SUCCESSFUL
: 0)
+ | (reply.deps != null ? DEPS
: 0);
+
+ out.writeByte(flags);
+ if (reply.supersededBy != null)
+ CommandSerializers.ballot.serialize(reply.supersededBy, out,
version);
+ if (reply.committedExecuteAt != null)
+ ExecuteAtSerializer.serialize(reply.committedExecuteAt, out);
+ if (reply.successful != null)
+ KeySerializers.participants.serialize(reply.successful, out,
version);
+ if (reply.deps != null)
+ DepsSerializers.deps.serialize(reply.deps, out, version);
}
+ private final AcceptOutcome[] outcomes = AcceptOutcome.values();
@Override
public AcceptReply deserialize(DataInputPlus in, int version) throws
IOException
{
int flags = in.readByte();
- switch (flags & 0x7)
- {
- default: throw new IllegalStateException("Unexpected
AcceptNack type: " + (flags & 0x7));
- case 1:
- return new
AcceptReply(DepsSerializers.deps.deserialize(in, version));
- case 2:
- return AcceptReply.SUCCESS;
- case 3:
- return new
AcceptReply(CommandSerializers.ballot.deserialize(in, version));
- case 4:
- Ballot supersededBy = (flags & 0x8) == 0 ? null :
CommandSerializers.ballot.deserialize(in, version);
- Timestamp committedExecuteAt = (flags & 0x10) == 0 ? null
: ExecuteAtSerializer.deserialize(in);
- return new AcceptReply(supersededBy, committedExecuteAt);
- }
+ AcceptOutcome outcome = outcomes[flags & 3];
+ Ballot supersededBy = (flags & SUPERSEDED_BY) == 0 ? null :
CommandSerializers.ballot.deserialize(in, version);
+ Timestamp committedExecuteAt = (flags & COMMITTED_EXECUTE_AT) == 0
? null : ExecuteAtSerializer.deserialize(in);
+ Participants<?> successful = (flags & SUCCESSFUL) == 0 ? null :
KeySerializers.participants.deserialize(in, version);
+ Deps deps = (flags & DEPS) == 0 ? null :
DepsSerializers.deps.deserialize(in, version);
+ return new AcceptReply(outcome, supersededBy, successful, deps,
committedExecuteAt);
}
@Override
public long serializedSize(AcceptReply reply, int version)
{
long size = TypeSizes.BYTE_SIZE;
- switch (reply.outcome())
- {
- default: throw new AssertionError();
- case Retired:
- case Truncated:
- throw illegalState("AcceptReply with invalid
AcceptOutcome: " + reply.outcome);
- case Success:
- if (reply.deps != null)
- size +=
DepsSerializers.deps.serializedSize(reply.deps, version);
- break;
- case RejectedBallot:
- size +=
CommandSerializers.ballot.serializedSize(reply.supersededBy, version);
- break;
- case Redundant:
- if (reply.supersededBy != null) size +=
CommandSerializers.ballot.serializedSize(reply.supersededBy, version);
- if (reply.committedExecuteAt != null) size +=
ExecuteAtSerializer.serializedSize(reply.committedExecuteAt);
- }
+ if (reply.supersededBy != null)
+ size +=
CommandSerializers.ballot.serializedSize(reply.supersededBy, version);
+ if (reply.committedExecuteAt != null)
+ size +=
ExecuteAtSerializer.serializedSize(reply.committedExecuteAt);
+ if (reply.successful != null)
+ size +=
KeySerializers.participants.serializedSize(reply.successful, version);
+ if (reply.deps != null)
+ size += DepsSerializers.deps.serializedSize(reply.deps,
version);
return size;
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index 44fff87469..8eeed391a7 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -66,7 +66,7 @@ public class BeginInvalidationSerializers
@Override
public void serialize(InvalidateReply reply, DataOutputPlus out, int
version) throws IOException
{
- CommandSerializers.nullableBallot.serialize(reply.supersededBy,
out, version);
+ CommandSerializers.ballot.serialize(reply.supersededBy, out,
version);
CommandSerializers.ballot.serialize(reply.accepted, out, version);
CommandSerializers.saveStatus.serialize(reply.maxStatus, out,
version);
CommandSerializers.saveStatus.serialize(reply.maxKnowledgeStatus,
out, version);
@@ -80,7 +80,7 @@ public class BeginInvalidationSerializers
public InvalidateReply deserialize(DataInputPlus in, int version)
throws IOException
{
// TODO (expected): use headers instead of nullable+bool
serializers
- Ballot supersededBy =
CommandSerializers.nullableBallot.deserialize(in, version);
+ Ballot supersededBy = CommandSerializers.ballot.deserialize(in,
version);
Ballot accepted = CommandSerializers.ballot.deserialize(in,
version);
SaveStatus maxStatus =
CommandSerializers.saveStatus.deserialize(in, version);
SaveStatus maxKnowledgeStatus =
CommandSerializers.saveStatus.deserialize(in, version);
@@ -94,7 +94,7 @@ public class BeginInvalidationSerializers
@Override
public long serializedSize(InvalidateReply reply, int version)
{
- return
CommandSerializers.nullableBallot.serializedSize(reply.supersededBy, version)
+ return
CommandSerializers.ballot.serializedSize(reply.supersededBy, version)
+ CommandSerializers.ballot.serializedSize(reply.accepted,
version)
+
CommandSerializers.saveStatus.serializedSize(reply.maxStatus, version)
+
CommandSerializers.saveStatus.serializedSize(reply.maxKnowledgeStatus, version)
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index d5a80d7b26..bf349b988b 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -75,8 +75,7 @@ public class CommandSerializers
public static final IVersionedSerializer<TxnId> nullableTxnId =
NullableSerializer.wrap(txnId);
public static final TimestampSerializer<Timestamp> timestamp = new
TimestampSerializer<>(Timestamp::fromBits);
public static final IVersionedSerializer<Timestamp> nullableTimestamp =
NullableSerializer.wrap(timestamp);
- public static final TimestampSerializer<Ballot> ballot = new
TimestampSerializer<>(Ballot::fromBits);
- public static final IVersionedSerializer<Ballot> nullableBallot =
NullableSerializer.wrap(ballot);
+ public static final BallotSerializer ballot = new BallotSerializer(); //
permits null
public static final EnumSerializer<Txn.Kind> kind = new
EnumSerializer<>(Txn.Kind.class);
public static final StoreParticipantsSerializer participants = new
StoreParticipantsSerializer();
@@ -356,8 +355,11 @@ public class CommandSerializers
public void skip(DataInputPlus in, int version) throws IOException
{
int flags = in.readByte();
- if (0 != (flags & HAS_ROUTE))
- KeySerializers.route.deserialize(in, version);
+ if (0 != (flags & HAS_ROUTE)) KeySerializers.route.skip(in,
version);
+ if (0 == (flags & HAS_TOUCHED_EQUALS_ROUTE))
KeySerializers.participants.skip(in, version);
+ if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED))
KeySerializers.participants.skip(in, version);
+ if (0 == (flags & OWNS_EQUALS_TOUCHES))
KeySerializers.participants.skip(in, version);
+ if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in, version);
}
@Override
@@ -446,7 +448,7 @@ public class CommandSerializers
TopologySerializers.nodeId.serialize(ts.node, out);
}
- public void skip(DataInputPlus in, int version) throws IOException
+ public void skip(DataInputPlus in) throws IOException
{
in.skipBytesFully(serializedSize());
}
@@ -500,6 +502,64 @@ public class CommandSerializers
}
}
+ public static class BallotSerializer implements
IVersionedSerializer<Ballot>
+ {
+ final TimestampSerializer<Ballot> wrapped = new
TimestampSerializer<>(Ballot::fromBits);
+
+ @Override
+ public void serialize(Ballot t, DataOutputPlus out, int version)
throws IOException
+ {
+ if (t == null || t.equals(Ballot.ZERO) || t.equals(Ballot.MAX))
+ {
+ out.writeByte(t == null ? 1 : t.equals(Ballot.ZERO) ? 2 : 3);
+ }
+ else
+ {
+ out.writeByte(0);
+ wrapped.serialize(t, out, version);
+ }
+ }
+
+ @Override
+ public Ballot deserialize(DataInputPlus in, int version) throws
IOException
+ {
+ return deserialize(in);
+ }
+
+ public Ballot deserialize(DataInputPlus in) throws IOException
+ {
+ int flags = in.readByte();
+ switch (flags)
+ {
+ default: throw new IOException("Corrupted input: expected
[0..3], received: " + flags);
+ case 0: return wrapped.deserialize(in);
+ case 1: return null;
+ case 2: return Ballot.ZERO;
+ case 3: return Ballot.MAX;
+ }
+ }
+
+ public void skip(DataInputPlus in) throws IOException
+ {
+ int flags = in.readByte();
+ if (flags == 0)
+ wrapped.skip(in);
+ }
+
+ @Override
+ public long serializedSize(Ballot t, int version)
+ {
+ return serializedSize(t);
+ }
+
+ public long serializedSize(Ballot t)
+ {
+ if (t == null || t.equals(Ballot.ZERO) || t.equals(Ballot.MAX))
+ return 1;
+ return 1 + wrapped.serializedSize();
+ }
+ }
+
public static class PartialTxnSerializer extends AbstractWithKeysSerializer
implements IVersionedSerializer<PartialTxn>
{
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
index 6eb7a0fe6b..ffb36a2224 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
@@ -80,11 +80,11 @@ public class KeySerializers
public static final IVersionedSerializer<Route<?>> nullableRoute;
public static final IVersionedSerializer<PartialRoute<?>> partialRoute;
- public static final IVersionedSerializer<FullRoute<?>> fullRoute;
+ public static final AbstractRoutablesSerializer<FullRoute<?>> fullRoute;
public static final IVersionedSerializer<Seekables<?, ?>> seekables;
public static final IVersionedSerializer<FullRoute<?>> nullableFullRoute;
- public static final IVersionedSerializer<Unseekables<?>> unseekables;
- public static final IVersionedSerializer<Participants<?>> participants;
+ public static final AbstractRoutablesSerializer<Unseekables<?>>
unseekables;
+ public static final AbstractRoutablesSerializer<Participants<?>>
participants;
public static final IVersionedSerializer<Participants<?>>
nullableParticipants;
static
@@ -138,11 +138,11 @@ public class KeySerializers
final IVersionedSerializer<Route<?>> nullableRoute;
final IVersionedSerializer<PartialRoute<?>> partialRoute;
- final IVersionedSerializer<FullRoute<?>> fullRoute;
- final IVersionedSerializer<Seekables<?, ?>> seekables;
+ final AbstractRoutablesSerializer<FullRoute<?>> fullRoute;
+ final AbstractSeekablesSerializer seekables;
final IVersionedSerializer<FullRoute<?>> nullableFullRoute;
- final IVersionedSerializer<Unseekables<?>> unseekables;
- final IVersionedSerializer<Participants<?>> participants;
+ final AbstractRoutablesSerializer<Unseekables<?>> unseekables;
+ final AbstractRoutablesSerializer<Participants<?>> participants;
final IVersionedSerializer<Participants<?>> nullableParticipants;
private Impl()
{
@@ -285,19 +285,19 @@ public class KeySerializers
this.route = (AbstractRoutablesSerializer<Route<?>>)
factory.apply(EnumSet.of(UnseekablesKind.PartialKeyRoute,
UnseekablesKind.FullKeyRoute, UnseekablesKind.PartialRangeRoute,
UnseekablesKind.FullRangeRoute));
this.nullableRoute = NullableSerializer.wrap(route);
- this.partialRoute = (IVersionedSerializer<PartialRoute<?>>)
factory.apply(EnumSet.of(UnseekablesKind.PartialKeyRoute,
UnseekablesKind.PartialRangeRoute));
- this.fullRoute = (IVersionedSerializer<FullRoute<?>>)
factory.apply(EnumSet.of(UnseekablesKind.FullKeyRoute,
UnseekablesKind.FullRangeRoute));
+ this.partialRoute = (AbstractRoutablesSerializer<PartialRoute<?>>)
factory.apply(EnumSet.of(UnseekablesKind.PartialKeyRoute,
UnseekablesKind.PartialRangeRoute));
+ this.fullRoute = (AbstractRoutablesSerializer<FullRoute<?>>)
factory.apply(EnumSet.of(UnseekablesKind.FullKeyRoute,
UnseekablesKind.FullRangeRoute));
this.nullableFullRoute = NullableSerializer.wrap(fullRoute);
- this.unseekables = (IVersionedSerializer<Unseekables<?>>)
factory.apply(EnumSet.allOf(UnseekablesKind.class));
- this.participants = (IVersionedSerializer<Participants<?>>)
factory.apply(EnumSet.allOf(UnseekablesKind.class));
+ this.unseekables = (AbstractRoutablesSerializer<Unseekables<?>>)
factory.apply(EnumSet.allOf(UnseekablesKind.class));
+ this.participants = (AbstractRoutablesSerializer<Participants<?>>)
factory.apply(EnumSet.allOf(UnseekablesKind.class));
this.nullableParticipants = NullableSerializer.wrap(participants);
this.seekables = new AbstractSeekablesSerializer(keys, ranges);
}
}
- static class AbstractRoutablesSerializer<RS extends Unseekables<?>>
implements IVersionedSerializer<RS>
+ public static class AbstractRoutablesSerializer<RS extends Unseekables<?>>
implements IVersionedSerializer<RS>
{
final EnumSet<UnseekablesKind> permitted;
final AbstractKeysSerializer<RoutingKey, RoutingKeys> routingKeys;
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/LatestDepsSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/LatestDepsSerializers.java
index aa0019d211..ccbb40a460 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/LatestDepsSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/LatestDepsSerializers.java
@@ -38,7 +38,7 @@ import
org.apache.cassandra.service.accord.serializers.CommandSerializers.Execut
public class LatestDepsSerializers
{
- public static final IVersionedSerializer<LatestDeps> latestDeps = new
IVersionedSerializer<LatestDeps>()
+ public static final IVersionedSerializer<LatestDeps> latestDeps = new
IVersionedSerializer<>()
{
@Override
public void serialize(LatestDeps t, DataOutputPlus out, int version)
throws IOException
@@ -133,8 +133,9 @@ public class LatestDepsSerializers
@Override
public GetLatestDeps deserializeBody(DataInputPlus in, int version,
TxnId txnId, Route<?> scope, long waitForEpoch, long minEpoch) throws
IOException
{
+ Ballot ballot = CommandSerializers.ballot.deserialize(in);
Timestamp executeAt = ExecuteAtSerializer.deserialize(in);
- return GetLatestDeps.SerializationSupport.create(txnId, scope,
waitForEpoch, minEpoch, executeAt);
+ return GetLatestDeps.SerializationSupport.create(txnId, scope,
waitForEpoch, minEpoch, ballot, executeAt);
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index ee0000f1a4..55832dfee7 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -90,7 +90,7 @@ public class RecoverySerializers
final RecoverReply.Kind[] kinds = RecoverReply.Kind.values();
void serializeNack(RecoverNack recoverNack, DataOutputPlus out, int
version) throws IOException
{
-
CommandSerializers.nullableBallot.serialize(recoverNack.supersededBy, out,
version);
+ CommandSerializers.ballot.serialize(recoverNack.supersededBy, out,
version);
}
void serializeOk(RecoverOk recoverOk, DataOutputPlus out, int version)
throws IOException
@@ -132,7 +132,7 @@ public class RecoverySerializers
{
RecoverReply.Kind kind = kinds[in.readByte()];
if (kind != Ok)
- return deserializeNack(kind,
CommandSerializers.nullableBallot.deserialize(in, version), in, version);
+ return deserializeNack(kind,
CommandSerializers.ballot.deserialize(in, version), in, version);
TxnId id = CommandSerializers.txnId.deserialize(in, version);
Status status = CommandSerializers.status.deserialize(in, version);
@@ -160,7 +160,7 @@ public class RecoverySerializers
long serializedNackSize(RecoverNack recoverNack, int version)
{
- return
CommandSerializers.nullableBallot.serializedSize(recoverNack.supersededBy,
version);
+ return
CommandSerializers.ballot.serializedSize(recoverNack.supersededBy, version);
}
long serializedOkSize(RecoverOk recoverOk, int version)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index dd9486ca30..3a28cec6b0 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -29,9 +29,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Data;
+import accord.api.RoutingKey;
import accord.primitives.Range;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
+import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResults;
@@ -66,6 +68,7 @@ import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.serializers.KeySerializers;
import org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Comparables;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.ObjectSizes;
@@ -219,6 +222,31 @@ public class TxnNamedRead extends
AbstractSerialized<ReadCommand>
}
}
+ public TxnNamedRead slice(Range range)
+ {
+ Invariants.require(key.domain().isRange());
+ if (key.equals(range))
+ return this;
+
+ Invariants.require(((Range)key).contains(range));
+ return new TxnNamedRead(txnDataName(), range, bytes());
+ }
+
+ public TxnNamedRead merge(TxnNamedRead with)
+ {
+ Invariants.require(key.domain().isRange());
+ if (key.equals(with.key))
+ return this;
+
+ Range thisRange = key.asRange();
+ Range thatRange = with.key.asRange();
+ Invariants.require(thisRange.compareTouching(thatRange) == 0);
+ RoutingKey start = Comparables.min(thisRange.start(),
thatRange.start());
+ RoutingKey end = Comparables.max(thisRange.end(), thatRange.end());
+ Range range = thisRange.newRange(start, end);
+ return new TxnNamedRead(txnDataName(), range, bytes());
+ }
+
public static boolean readsWithoutReconciliation(ConsistencyLevel
consistencyLevel)
{
boolean withoutReconciliation = consistencyLevel == null ||
consistencyLevel == ConsistencyLevel.ONE;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index d52cea531c..aa58aa7875 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nonnull;
@@ -41,6 +40,8 @@ import accord.primitives.Routable.Domain;
import accord.primitives.Seekable;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
+import accord.utils.Invariants;
+import accord.utils.UnhandledEnum;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -55,6 +56,7 @@ import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.ObjectSizes;
+import static accord.primitives.Routables.Slice.Minimal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
@@ -112,6 +114,10 @@ public class TxnRead extends
AbstractKeySorted<TxnNamedRead> implements Read
checkArgument(cassandraConsistencyLevel == null ||
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel),
"Unsupported consistency level for read: %s", cassandraConsistencyLevel);
this.cassandraConsistencyLevel = cassandraConsistencyLevel;
this.domain = items[0].key().domain();
+ // TODO (expected): relax this condition, require only that it holds
for each equal byte[]
+ // right now this means we don't permit two different range queries
in the same transaction touching adjacent ranges
+ // this is a pretty weak restriction and doesn't interfere with
current CQL capabilities, but should be addressed eventually
+ Invariants.require(domain == Domain.Key ||
((Ranges)keys()).mergeTouching() == keys());
}
private TxnRead(@Nonnull List<TxnNamedRead> items, @Nullable
ConsistencyLevel cassandraConsistencyLevel)
@@ -120,6 +126,7 @@ public class TxnRead extends
AbstractKeySorted<TxnNamedRead> implements Read
checkArgument(cassandraConsistencyLevel == null ||
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel),
"Unsupported consistency level for read: %s", cassandraConsistencyLevel);
this.cassandraConsistencyLevel = cassandraConsistencyLevel;
this.domain = items.get(0).key().domain();
+ Invariants.require(domain == Domain.Key ||
((Ranges)keys()).mergeTouching() == keys());
}
private static void sortReads(List<TxnNamedRead> reads)
@@ -213,52 +220,147 @@ public class TxnRead extends
AbstractKeySorted<TxnNamedRead> implements Read
@Override
public Read slice(Ranges ranges)
{
- return intersecting(itemKeys.slice(ranges));
+ return select(itemKeys.slice(ranges, Minimal));
}
@Override
public Read intersecting(Participants<?> participants)
{
- return intersecting(itemKeys.intersecting(participants));
+ return select(itemKeys.intersecting(participants, Minimal));
}
- private Read intersecting(Seekables<?, ?> select)
+ private Read select(Seekables<?, ?> select)
{
- // TODO (review): Why construct this keys at all and not just check
against select?
- Seekables<?, ?> keys = (Seekables<?, ?>)itemKeys.intersecting(select);
- List<TxnNamedRead> reads = new ArrayList<>(keys.size());
+ if (select == keys())
+ return this;
- switch (keys.domain())
+ List<TxnNamedRead> reads = new ArrayList<>(select.size());
+ switch (select.domain())
{
case Key:
- for (TxnNamedRead read : items)
- if (keys.contains((Key)read.key()))
+ {
+ Keys keys = (Keys) select;
+ int i = 0, j = 0;
+ while (i < select.size() && j < items.length)
+ {
+ Key key = keys.get(i);
+ TxnNamedRead read = items[j];
+ int c = key.compareTo((Key)read.key());
+ if (c < 0) ++i;
+ else if (c > 0) ++j;
+ else
+ {
reads.add(read);
+ ++j;
+ }
+ }
break;
+ }
case Range:
- for (TxnNamedRead read : items)
- if (keys.intersects((Range)read.key()))
- reads.add(read);
+ {
+ Ranges ranges = (Ranges) select;
+ int i = 0, j = 0;
+ while (i < select.size() && j < items.length)
+ {
+ Range range = ranges.get(i);
+ TxnNamedRead read = items[j];
+ int c = range.compareIntersecting((Range) read.key());
+ if (c < 0) ++i;
+ else if (c > 0) ++j;
+ else
+ {
+ reads.add(read.slice(range));
+ ++j;
+ }
+ }
break;
+ }
default:
- throw new IllegalStateException("Unhandled domain " +
keys.domain());
+ throw new UnhandledEnum(select.domain());
}
- return createTxnRead(reads, cassandraConsistencyLevel, keys.domain());
+ return createTxnRead(reads, cassandraConsistencyLevel,
select.domain());
}
@Override
public Read merge(Read read)
{
- TxnRead txnRead = (TxnRead)read;
+ TxnRead that = (TxnRead)read;
List<TxnNamedRead> reads = new ArrayList<>(items.length);
- Collections.addAll(reads, items);
-
- for (TxnNamedRead namedRead : txnRead)
- if (!reads.contains(namedRead))
- reads.add(namedRead);
- return createTxnRead(reads, cassandraConsistencyLevel, txnRead.domain);
+ switch (domain)
+ {
+ default: throw new UnhandledEnum(domain);
+ case Key:
+ {
+ int i = 0, j = 0;
+ while (i < items.length && j < that.items.length)
+ {
+ TxnNamedRead r1 = this.items[i], r2 = that.items[i];
+ int c = compareKey(r1, r2);
+ if (c <= 0)
+ {
+ reads.add(r1);
+ ++i;
+ if (c == 0)
+ ++j;
+ }
+ else
+ {
+ reads.add(r2);
+ ++j;
+ }
+ }
+ break;
+ }
+ case Range:
+ {
+ int i = 0, j = 0;
+ TxnNamedRead pending = null;
+ while (i < items.length && j < that.items.length)
+ {
+ TxnNamedRead r1 = this.items[i], r2 = that.items[i];
+ int c = compareRange(r1, r2);
+ TxnNamedRead add;
+ if (c == 0)
+ {
+ add = r1.merge(r2);
+ ++i;
+ ++j;
+ }
+ else if (c < 0)
+ {
+ add = r1;
+ ++i;
+ }
+ else
+ {
+ add = r2;
+ ++j;
+ }
+
+ if (pending == null) pending = add;
+ else
+ {
+ c = compareRange(pending, add);
+ if (c < 0)
+ {
+ reads.add(pending);
+ pending = add;
+ }
+ else
+ {
+ Invariants.require(c == 0);
+ pending = pending.merge(add);
+ }
+ }
+ }
+ if (pending != null)
+ reads.add(pending);
+ break;
+ }
+ }
+ return createTxnRead(reads, cassandraConsistencyLevel, that.domain);
}
public void unmemoize()
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index bc8d5d2bb3..8b1611dd7a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.service.accord.AccordObjectSizes;
import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.BooleanSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
@@ -381,8 +380,6 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
@Override
public AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore,
TxnId txnId, Timestamp executeAt, DataStore store, PartialTxn txn)
{
- ClusterMetadata cm = ClusterMetadata.current();
- checkState(cm.epoch.getEpoch() >= executeAt.epoch(), "TCM epoch %d is
< executeAt epoch %d", cm.epoch.getEpoch(), executeAt.epoch());
// UnrecoverableRepairUpdate will deserialize as null at other nodes
// Accord should skip the Update for a read transaction, but handle it
here anyways
TxnUpdate txnUpdate = ((TxnUpdate)txn.update());
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationWriteRaceTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationWriteRaceTestBase.java
index 593b1296ce..6984fa92f7 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationWriteRaceTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationWriteRaceTestBase.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
import accord.api.RoutingKey;
import accord.coordinate.Outcome;
import accord.messages.PreAccept;
-import accord.primitives.PartialKeyRoute;
+import accord.primitives.KeyRoute;
import accord.primitives.Ranges;
import accord.primitives.Routable.Domain;
import accord.primitives.Route;
@@ -476,7 +476,7 @@ public abstract class AccordMigrationWriteRaceTestBase
extends AccordTestBase
PreAccept preAccept =
(PreAccept)Instance.deserializeMessage(message).payload;
Route<?> route = preAccept.scope;
if (route.domain() == Domain.Key)
- for (RoutingKey key :
(PartialKeyRoute)route)
+ for (RoutingKey key : (KeyRoute)route)
{
AccordRoutingKey routingKey =
(AccordRoutingKey)key;
ColumnFamilyStore cfs =
ColumnFamilyStore.getIfExists(routingKey.table());
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index a63c79fae6..9a3e81d74c 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -48,7 +48,7 @@ import accord.api.RoutingKey;
import accord.coordinate.Invalidated;
import accord.impl.progresslog.DefaultProgressLogs;
import accord.messages.PreAccept;
-import accord.primitives.PartialKeyRoute;
+import accord.primitives.KeyRoute;
import accord.primitives.Routable.Domain;
import accord.primitives.Route;
import accord.primitives.TxnId;
@@ -656,7 +656,7 @@ public abstract class AccordTestBase extends TestBaseImpl
PreAccept preAccept =
(PreAccept)Instance.deserializeMessage(message).payload;
Route<?> route = preAccept.scope;
if (route.domain() == Domain.Key)
- for (RoutingKey key : (PartialKeyRoute)route)
+ for (RoutingKey key : (KeyRoute)route)
{
AccordRoutingKey routingKey =
(AccordRoutingKey)key;
ColumnFamilyStore cfs =
ColumnFamilyStore.getIfExists(routingKey.table());
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
index 96a393eaf3..299e702a49 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.coordinate.CoordinationFailed;
-import accord.coordinate.Invalidated;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.distributed.Cluster;
diff --git a/test/unit/org/apache/cassandra/Util.java
b/test/unit/org/apache/cassandra/Util.java
index 458f6e52e4..04689037c1 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -58,6 +58,7 @@ import org.junit.Assume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.utils.Invariants;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.AbstractReadCommandBuilder;
@@ -377,7 +378,7 @@ public class Util
catch (Throwable e)
{
// Use name because in-jvm dtests will have different instances of
the class
- assert e.getClass().getName().equals(exception.getName()) :
e.getClass().getName() + " is not " + exception.getName();
+
Invariants.require(e.getClass().getName().equals(exception.getName()),
e.getClass().getName() + " is not " + exception.getName());
thrown = true;
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 9baf5f28c9..af94147517 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -144,7 +144,7 @@ public class AccordCommandTest
builder.add(key.toUnseekable(), txnId2);
deps = builder.build();
}
- Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1,
SLOW, Ballot.ZERO, executeAt, deps);
+ Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1,
SLOW, Ballot.ZERO, executeAt, deps, false);
getUninterruptibly(commandStore.execute(accept, safeStore -> {
Command before = safeStore.ifInitialised(txnId).current();
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
index 4e376e4939..c4db93f6cc 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.journal.TestParams;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -77,7 +77,7 @@ public class AccordJournalOrderTest
ServerTestUtils.cleanupDirectory(DatabaseDescriptor.getAccordJournalDirectory());
AccordJournal accordJournal = new AccordJournal(TestParams.INSTANCE,
new AccordAgent());
accordJournal.start(null);
- RandomSource randomSource = RandomSource.wrap(new Random());
+ RandomSource randomSource = RandomSource.wrap(new Random(0));
TxnId id1 = AccordGens.txnIds().next(randomSource);
TxnId id2 = AccordGens.txnIds().next(randomSource);
@@ -87,7 +87,7 @@ public class AccordJournalOrderTest
TxnId txnId = randomSource.nextBoolean() ? id1 : id2;
JournalKey key = new JournalKey(txnId,
JournalKey.Type.COMMAND_DIFF, randomSource.nextInt(5));
res.compute(key, (k, prev) -> prev == null ? 1 : prev + 1);
- Participants<?> participants = RoutingKeys.of(new
AccordRoutingKey.TokenKey(TableId.generate(), new
Murmur3Partitioner.LongToken(1)));
+ Participants<?> participants = RoutingKeys.of(new
AccordRoutingKey.TokenKey(TableId.generate(), new
ByteOrderedPartitioner.BytesToken(new byte[1])));
Command command = Command.NotDefined.notDefined(txnId,
SaveStatus.NotDefined, Status.Durability.NotDurable,
StoreParticipants.create(null, participants, null, participants, participants),
Ballot.ZERO);
accordJournal.saveCommand(key.commandStoreId,
new Journal.CommandUpdate(null, command),
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
index 4bfbeb8a6f..62196013a9 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
@@ -28,12 +28,12 @@ import accord.messages.ReadData;
import accord.messages.ReadData.CommitOrReadNack;
import accord.topology.TopologyUtils;
import
org.apache.cassandra.service.accord.AccordFetchCoordinator.AccordFetchRequest;
+import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.api.AccordTimeService;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import accord.Utils;
-import accord.api.Agent;
import accord.impl.AbstractFetchCoordinator;
import accord.impl.IntKey;
import accord.local.Node;
@@ -72,7 +72,7 @@ public class AccordMessageSinkTest
DatabaseDescriptor.clientInitialization();
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
ClusterMetadataService.initializeForClients();
- sink = new AccordMessageSink(Mockito.mock(Agent.class), messaging,
mapping, new RequestCallbacks(new AccordTimeService()));
+ sink = new AccordMessageSink(Mockito.mock(AccordAgent.class),
messaging, mapping, new RequestCallbacks(new AccordTimeService()));
}
@Test
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index e0c1d126bf..168fb74b68 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -304,7 +304,7 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
}
else if (RoutableKey.class.isAssignableFrom(keyType))
{
- RoutableKey key = (RoutableKey) state.key();
+ RoutingKey key = (RoutingKey) state.key();
if ((keys.contains(key) || ranges.intersects(key))
&& shouldEvict.getAsBoolean())
cache.tryEvict(state);
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 65b01668e1..04d2bd6390 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -654,7 +654,7 @@ public class CommandsForKeySerializerTest
@Override public <T> AsyncChain<T> build(PreLoadContext context,
Function<? super SafeCommandStore, T> apply) { throw new
UnsupportedOperationException(); }
@Override public void shutdown() { }
@Override protected void registerTransitive(SafeCommandStore
safeStore, RangeDeps deps){ }
- @Override public <T> AsyncChain<T> submit(Callable<T> task) { throw
new UnsupportedOperationException(); }
+ @Override public <T> AsyncChain<T> build(Callable<T> task) { throw new
UnsupportedOperationException(); }
@Override public void onRecover(Node node, Result success, Throwable
fail) { throw new UnsupportedOperationException(); }
@Override public void onInconsistentTimestamp(Command command,
Timestamp prev, Timestamp next) { throw new UnsupportedOperationException(); }
@Override public void onFailedBootstrap(String phase, Ranges ranges,
Runnable retry, Throwable failure) { throw new UnsupportedOperationException();
}
@@ -670,6 +670,7 @@ public class CommandsForKeySerializerTest
@Override public long attemptCoordinationDelay(Node node,
SafeCommandStore safeStore, TxnId txnId, TimeUnit units, int retryCount) {
return 0; }
@Override public long seekProgressDelay(Node node, SafeCommandStore
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil blockedUntil,
TimeUnit units) { return 0; }
@Override public long retryAwaitTimeout(Node node, SafeCommandStore
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil retrying,
TimeUnit units) { return 0; }
+ @Override public long localSlowAt(TxnId txnId, Status.Phase phase,
TimeUnit unit) { return 0; }
@Override public long localExpiresAt(TxnId txnId, Status.Phase phase,
TimeUnit unit) { return 0; }
@Override public long expiresAt(ReplyContext replyContext, TimeUnit
unit) { return 0; }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]