This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 72567e668c CEP-15: (Accord) Bootstraps LocalOnly txn can not be
recreated from SerializerSupport
72567e668c is described below
commit 72567e668c8ba63c8b8316583542c0c7b4918cc6
Author: David Capwell <[email protected]>
AuthorDate: Fri May 31 10:37:27 2024 -0700
CEP-15: (Accord) Bootstraps LocalOnly txn can not be recreated from
SerializerSupport
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-19674
---
modules/accord | 2 +-
.../apache/cassandra/schema/SchemaProvider.java | 5 ++
.../cassandra/service/accord/AccordJournal.java | 28 ++++----
.../cassandra/service/accord/AccordKeyspace.java | 2 +-
.../service/accord/AccordSafeCommandStore.java | 6 ++
.../accord/AccordSafeCommandsForRanges.java | 57 ++-------------
.../cassandra/service/accord/AccordService.java | 5 +-
.../service/accord/ImmutableAccordSafeState.java | 84 ++++++++++++++++++++++
.../cassandra/service/accord/api/PartitionKey.java | 5 +-
.../service/accord/async/AsyncLoader.java | 7 +-
.../accord/SimulatedAccordCommandStore.java | 24 ++++++-
.../SimulatedAccordCommandStoreTestBase.java | 71 ++++++++++++++++++
.../accord/async/SimulatedAsyncOperationTest.java | 64 ++++++++++++++---
13 files changed, 278 insertions(+), 82 deletions(-)
diff --git a/modules/accord b/modules/accord
index 84e89bd91c..cf10169067 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 84e89bd91cf1b058fbf314b750336a1ec1096b18
+Subproject commit cf10169067a8cd40fb876789a62439cc03fd2e9b
diff --git a/src/java/org/apache/cassandra/schema/SchemaProvider.java
b/src/java/org/apache/cassandra/schema/SchemaProvider.java
index 45b70ec2e7..a21e9adea3 100644
--- a/src/java/org/apache/cassandra/schema/SchemaProvider.java
+++ b/src/java/org/apache/cassandra/schema/SchemaProvider.java
@@ -141,6 +141,11 @@ public interface SchemaProvider
return metadata == null ? null : metadata.partitioner;
}
+ default IPartitioner getExistingTablePartitioner(TableId id) throws
UnknownTableException
+ {
+ return getExistingTableMetadata(id).partitioner;
+ }
+
@Nullable
default TableMetadataRef getTableMetadataRef(TableId id)
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 0c31afbb4c..80cfdf31ea 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -40,6 +40,12 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.primitives.Ints;
+
+import accord.messages.ApplyThenWaitUntilApplied;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.LongArrayList;
+import org.agrona.collections.ObjectHashSet;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +55,6 @@ import accord.local.SerializerSupport;
import accord.messages.AbstractEpochRequest;
import accord.messages.Accept;
import accord.messages.Apply;
-import accord.messages.ApplyThenWaitUntilApplied;
import accord.messages.BeginRecovery;
import accord.messages.Commit;
import accord.messages.LocalRequest;
@@ -65,9 +70,6 @@ import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.MapReduceConsume;
-import org.agrona.collections.Long2ObjectHashMap;
-import org.agrona.collections.LongArrayList;
-import org.agrona.collections.ObjectHashSet;
import org.apache.cassandra.concurrent.Interruptible;
import org.apache.cassandra.concurrent.ManyToOneConcurrentLinkedQueue;
import org.apache.cassandra.concurrent.SequentialExecutorPlus;
@@ -104,7 +106,6 @@ import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.concurrent.Semaphore;
import org.apache.cassandra.utils.vint.VIntCoding;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.jctools.queues.SpscLinkedQueue;
import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ;
@@ -256,11 +257,11 @@ public class AccordJournal implements IJournal,
Shutdownable
/**
* Accord protocol messages originating from local node, e.g. Propagate.
*/
- public void appendLocalRequest(LocalRequest<?> request)
+ public <R> void appendLocalRequest(LocalRequest<R> request, BiConsumer<?
super R, Throwable> callback)
{
Type type = Type.fromMessageType(request.type());
Key key = new Key(type.txnId(request), type);
- journal.asyncWrite(key, request, SENTINEL_HOSTS, null);
+ journal.asyncWrite(key, request, SENTINEL_HOSTS, callback);
}
@VisibleForTesting
@@ -313,7 +314,7 @@ public class AccordJournal implements IJournal, Shutdownable
if (key.type.isRemoteRequest())
frameAggregator.onWrite(RemoteRequestContext.create(((Request)
value).waitForEpoch(), (ResponseContext) writeContext, pointer));
else if (key.type.isLocalRequest())
-
frameAggregator.onWrite(LocalRequestContext.create((LocalRequest<?>) value,
pointer));
+
frameAggregator.onWrite(LocalRequestContext.create((LocalRequest<?>) value,
(BiConsumer<?, Throwable>) writeContext, pointer));
else
frameApplicator.onWrite(pointer, size, (FrameContext)
writeContext);
}
@@ -404,9 +405,9 @@ public class AccordJournal implements IJournal, Shutdownable
this.callback = callback;
}
- static LocalRequestContext create(LocalRequest<?> request,
RecordPointer pointer)
+ static LocalRequestContext create(LocalRequest<?> request,
BiConsumer<?, Throwable> callback, RecordPointer pointer)
{
- return new LocalRequestContext(request.waitForEpoch(),
request.callback(), pointer);
+ return new LocalRequestContext(request.waitForEpoch(), callback,
pointer);
}
}
@@ -1182,13 +1183,14 @@ public class AccordJournal implements IJournal,
Shutdownable
private void applyRequest(RecordPointer pointer, RequestContext
context, long preAcceptTimeout)
{
- Request request = (Request) cachedRecords.remove(pointer);
- Type type = Type.fromMessageType(request.type());
+ Message message = (Message) cachedRecords.remove(pointer);
+ Type type = Type.fromMessageType(message.type());
if (type == Type.PRE_ACCEPT || type == Type.BEGIN_RECOVER)
context.preAcceptTimeout(preAcceptTimeout);
if (type.isRemoteRequest())
{
+ Request request = (Request) message;
RemoteRequestContext ctx = (RemoteRequestContext) context;
Id from = endpointMapper.mappedId(ctx.from());
request.process(node, from, ctx);
@@ -1199,7 +1201,7 @@ public class AccordJournal implements IJournal,
Shutdownable
LocalRequestContext ctx = (LocalRequestContext) context;
// TODO (expected): Make Propagate PreAccept receive
preAcceptTimeout and timestamps
//noinspection unchecked,rawtypes
- ((LocalRequest) request).process(node, ctx.callback);
+ ((LocalRequest) message).process(node, ctx.callback);
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index ab6701d6d6..2dd2b5d28d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -1238,7 +1238,7 @@ public class AccordKeyspace
WaitingOnProvider waitingOn = deserializeWaitingOn(txnId, row);
MessageProvider messages = commandStore.makeMessageProvider(txnId);
- return
SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs,
status, executeAt, executeAtLeast, promised, accepted, waitingOn, messages);
+ return SerializerSupport.reconstruct(commandStore.agent(),
commandStore.unsafeRangesForEpoch(), attrs, status, executeAt, executeAtLeast,
promised, accepted, waitingOn, messages);
}
catch (Throwable t)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index 32a6973b07..ca0413b0d3 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -344,4 +344,10 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
if (commandsForRanges != null)
commandsForRanges.postExecute();
}
+
+ @Override
+ public String toString()
+ {
+ return "AccordSafeCommandStore(id=" + commandStore().id() + ")";
+ }
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForRanges.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForRanges.java
index 42fb0f6ef1..848df1d270 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForRanges.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForRanges.java
@@ -21,61 +21,25 @@ package org.apache.cassandra.service.accord;
import java.util.NavigableMap;
import java.util.Objects;
-import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.TxnId;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import org.apache.cassandra.utils.Pair;
-public class AccordSafeCommandsForRanges implements AccordSafeState<Range,
CommandsForRanges>
+public class AccordSafeCommandsForRanges extends
ImmutableAccordSafeState<Ranges, CommandsForRanges>
{
private final AsyncResult<Pair<CommandsForRangesLoader.Watcher,
NavigableMap<TxnId, CommandsForRangesLoader.Summary>>> chain;
- private final Ranges ranges;
- private boolean invalidated;
- private CommandsForRanges original, current;
public AccordSafeCommandsForRanges(Ranges ranges,
AsyncResult<Pair<CommandsForRangesLoader.Watcher, NavigableMap<TxnId,
CommandsForRangesLoader.Summary>>> chain)
{
- this.ranges = ranges;
+ super(ranges);
this.chain = chain;
}
public Ranges ranges()
{
- return ranges;
- }
-
- @Override
- public CommandsForRanges current()
- {
- checkNotInvalidated();
- return current;
- }
-
- @Override
- public void invalidate()
- {
- invalidated = true;
- }
-
- @Override
- public boolean invalidated()
- {
- return invalidated;
- }
-
- @Override
- public void set(CommandsForRanges update)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CommandsForRanges original()
- {
- checkNotInvalidated();
- return original;
+ return key();
}
@Override
@@ -85,17 +49,11 @@ public class AccordSafeCommandsForRanges implements
AccordSafeState<Range, Comma
Pair<CommandsForRangesLoader.Watcher, NavigableMap<TxnId,
CommandsForRangesLoader.Summary>> pair = AsyncChains.getUnchecked(chain);
pair.left.close();
pair.left.get().entrySet().forEach(e -> pair.right.put(e.getKey(),
e.getValue()));
- current = original = new CommandsForRanges(ranges, pair.right);
- }
-
- @Override
- public void postExecute()
- {
- checkNotInvalidated();
+ original = new CommandsForRanges(key, pair.right);
}
@Override
- public AccordCachingState<Range, CommandsForRanges> global()
+ public AccordCachingState<Ranges, CommandsForRanges> global()
{
throw new UnsupportedOperationException();
}
@@ -106,13 +64,13 @@ public class AccordSafeCommandsForRanges implements
AccordSafeState<Range, Comma
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AccordSafeCommandsForRanges that = (AccordSafeCommandsForRanges) o;
- return Objects.equals(original, that.original) &&
Objects.equals(current, that.current);
+ return Objects.equals(original, that.original);
}
@Override
public int hashCode()
{
- return Objects.hash(original, current);
+ return Objects.hash(original);
}
@Override
@@ -122,7 +80,6 @@ public class AccordSafeCommandsForRanges implements
AccordSafeState<Range, Comma
"chain=" + chain +
", invalidated=" + invalidated +
", original=" + original +
- ", current=" + current +
'}';
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 8e2778f394..8764addbf5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
@@ -562,11 +563,11 @@ public class AccordService implements IAccordService,
Shutdownable
}
}
- private void handleLocalRequest(LocalRequest<?> request, Node node)
+ private <R> void handleLocalRequest(LocalRequest<R> request, BiConsumer<?
super R, Throwable> callback, Node node)
{
// currently, we only create LocalRequests that have side effects and
need to be persisted
Invariants.checkState(request.type().hasSideEffects());
- journal.appendLocalRequest(request);
+ journal.appendLocalRequest(request, callback);
}
private static RequestTimeoutException newTimeout(TxnId txnId, Txn txn,
ConsistencyLevel consistencyLevel)
diff --git
a/src/java/org/apache/cassandra/service/accord/ImmutableAccordSafeState.java
b/src/java/org/apache/cassandra/service/accord/ImmutableAccordSafeState.java
new file mode 100644
index 0000000000..850f6f7e8d
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/ImmutableAccordSafeState.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import javax.annotation.Nullable;
+
+public abstract class ImmutableAccordSafeState<K, V> implements
AccordSafeState<K, V>
+{
+ protected final K key;
+ @Nullable
+ protected V original;
+ protected boolean invalidated;
+
+ protected ImmutableAccordSafeState(K key)
+ {
+ this.key = key;
+ }
+
+ @Override
+ public K key()
+ {
+ return key;
+ }
+
+ @Override
+ public V original()
+ {
+ checkNotInvalidated();
+ return original;
+ }
+
+ @Override
+ public V current()
+ {
+ checkNotInvalidated();
+ return original;
+ }
+
+ @Override
+ public void invalidate()
+ {
+ invalidated = true;
+ }
+
+ @Override
+ public boolean invalidated()
+ {
+ return invalidated;
+ }
+
+ @Override
+ public void set(V update)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void revert()
+ {
+ checkNotInvalidated();
+ }
+
+ @Override
+ public void postExecute()
+ {
+ checkNotInvalidated();
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
index a42fcf57bf..71feb7d88e 100644
--- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -147,8 +148,8 @@ public final class PartitionKey extends AccordRoutableKey
implements Key
public PartitionKey deserialize(DataInputPlus in, int version) throws
IOException
{
TableId tableId = TableId.deserialize(in);
- TableMetadata metadata =
Schema.instance.getExistingTableMetadata(tableId);
- DecoratedKey key =
metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
+ IPartitioner partitioner =
Schema.instance.getExistingTablePartitioner(tableId);
+ DecoratedKey key =
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
return new PartitionKey(tableId, key);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 9a26bd4be1..740cb3b5d1 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -22,7 +22,6 @@ import accord.local.CommandsForKey;
import accord.local.KeyHistory;
import accord.local.PreLoadContext;
import accord.primitives.*;
-import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
@@ -196,7 +195,11 @@ public class AsyncLoader
private AsyncChain<Set<? extends Key>> findOverlappingKeys(Ranges ranges)
{
- Invariants.checkArgument(!ranges.isEmpty());
+ if (ranges.isEmpty())
+ {
+ // During topology changes some shards may be included with empty
ranges
+ return AsyncChains.success(Collections.emptySet());
+ }
List<AsyncChain<Set<PartitionKey>>> chains = new
ArrayList<>(ranges.size());
for (Range range : ranges)
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index a0bb647c41..f7206d8331 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -47,6 +47,7 @@ import accord.primitives.Keys;
import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.RoutableKey;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -88,6 +89,7 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
public final AccordCommandStore store;
public final Node.Id nodeId;
public final Topology topology;
+ public final Topologies topologies;
public final MockJournal journal;
public final ScheduledExecutorPlus unorderedScheduled;
public final List<String> evictions = new ArrayList<>();
@@ -189,6 +191,7 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
});
this.topology =
AccordTopology.createAccordTopology(ClusterMetadata.current());
+ this.topologies = new
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology);
var rangesForEpoch = new
CommandStores.RangesForEpoch(topology.epoch(), topology.ranges(), store);
updateHolder.add(topology.epoch(), rangesForEpoch, topology.ranges());
updateHolder.updateGlobal(topology.ranges());
@@ -209,6 +212,21 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
return new TxnId(timeService.epoch(), timeService.now(), kind, domain,
nodeId);
}
+ public void maybeCacheEvict(Seekables<?, ?> keysOrRanges)
+ {
+ switch (keysOrRanges.domain())
+ {
+ case Key:
+ maybeCacheEvict((Keys) keysOrRanges, Ranges.EMPTY);
+ break;
+ case Range:
+ maybeCacheEvict(Keys.EMPTY, (Ranges) keysOrRanges);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown domain: " +
keysOrRanges.domain());
+ }
+ }
+
public void maybeCacheEvict(Keys keys, Ranges ranges)
{
AccordStateCache cache = store.cache();
@@ -217,7 +235,7 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
if (TxnId.class.equals(keyType))
{
Command command = (Command) state.state().get();
- if (command.known().definition.isKnown()
+ if (command != null && command.known().definition.isKnown()
&& (command.partialTxn().keys().intersects(keys) ||
ranges.intersects(command.partialTxn().keys()))
&& shouldEvict.getAsBoolean())
cache.maybeEvict(state);
@@ -322,7 +340,7 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
public Pair<TxnId, AsyncResult<PreAccept.PreAcceptOk>>
enqueuePreAccept(Txn txn, FullRoute<?> route)
{
TxnId txnId = nextTxnId(txn.kind(), txn.keys().domain());
- PreAccept preAccept = new PreAccept(nodeId, new
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology), txnId, txn,
route);
+ PreAccept preAccept = new PreAccept(nodeId, topologies, txnId, txn,
route);
return Pair.create(txnId, processAsync(preAccept, safe -> {
var reply = preAccept.apply(safe);
Assertions.assertThat(reply.isOk()).isTrue();
@@ -334,7 +352,7 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
{
TxnId txnId = nextTxnId(txn.kind(), txn.keys().domain());
Ballot ballot = Ballot.fromValues(timeService.epoch(),
timeService.now(), nodeId);
- BeginRecovery br = new BeginRecovery(nodeId, new
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology), txnId, txn,
route, ballot);
+ BeginRecovery br = new BeginRecovery(nodeId, topologies, txnId, txn,
route, ballot);
return Pair.create(txnId, processAsync(br, safe -> {
var reply = br.apply(safe);
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index 5aed34bc79..2c313566df 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -37,14 +37,19 @@ import accord.messages.BeginRecovery;
import accord.messages.PreAccept;
import accord.primitives.Ballot;
import accord.primitives.Deps;
+import accord.primitives.FullRangeRoute;
import accord.primitives.FullRoute;
import accord.primitives.Keys;
import accord.primitives.LatestDeps;
import accord.primitives.Range;
import accord.primitives.Ranges;
+import accord.primitives.Routable;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topologies;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import org.apache.cassandra.ServerTestUtils;
@@ -57,11 +62,13 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Pair;
import org.assertj.core.api.Assertions;
import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
+import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
public abstract class SimulatedAccordCommandStoreTestBase extends CQLTester
{
@@ -78,6 +85,21 @@ public abstract class SimulatedAccordCommandStoreTestBase
extends CQLTester
protected enum DepsMessage
{PreAccept, BeginRecovery, PreAcceptThenBeginRecovery}
+ protected static final Gen<Gen<Routable.Domain>> mixedDomainGen =
Gens.enums().allMixedDistribution(Routable.Domain.class);
+ protected static final Gen<Gen.LongGen> mixedTokenGen = top -> {
+ switch (top.nextInt(0, 3))
+ {
+ case 0: // all
+ return rs -> rs.nextLong(Long.MIN_VALUE + 1, Long.MAX_VALUE);
+ case 1: // small
+ return rs -> rs.nextLong(0, 100);
+ case 2: // medium
+ return rs -> rs.nextLong(0, Long.MAX_VALUE);
+ default:
+ throw new AssertionError();
+ }
+ };
+
protected static TableMetadata intTbl, reverseTokenTbl;
protected static Node.Id nodeId;
@@ -345,4 +367,53 @@ public abstract class SimulatedAccordCommandStoreTestBase
extends CQLTester
Assertions.assertThat(deps.keyDeps.txnIds(key)).describedAs("Txn %s for key
%s", txnId, key).isEqualTo(keyConflicts.get(key));
}
}
+
+ protected static Gen<Pair<Txn, FullRoute<?>>>
randomTxn(Gen<Routable.Domain> domainGen, Gen.LongGen tokenGen)
+ {
+ TableMetadata tbl = reverseTokenTbl;
+ Invariants.checkArgument(tbl.partitioner ==
Murmur3Partitioner.instance, "Only murmur partitioner is supported; given %s",
tbl.partitioner.getClass());
+ Gen<PartitionKey> keyGen = rs -> new PartitionKey(tbl.id,
tbl.partitioner.decorateKey(Murmur3Partitioner.LongToken.keyForToken(tokenGen.nextLong(rs))));
+ Gen<Range> rangeGen = rs -> {
+ long a = tokenGen.nextLong(rs);
+ long b = tokenGen.nextLong(rs);
+ while (a == b)
+ b = tokenGen.nextLong(rs);
+ if (a > b)
+ {
+ long tmp = a;
+ a = b;
+ b = tmp;
+ }
+ return tokenRange(tbl.id, a, b);
+ };
+ return rs -> {
+ Routable.Domain domain = domainGen.next(rs);
+ switch (domain)
+ {
+ case Key:
+ {
+ Keys keys =
Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 5).next(rs));
+ List<String> inserts = new ArrayList<>(keys.size());
+ List<Object> binds = new ArrayList<>(keys.size());
+ for (int i = 0; i < keys.size(); i++)
+ {
+ inserts.add(String.format("INSERT INTO %s (pk) VALUES
(?)", tbl));
+ binds.add(((PartitionKey)
keys.get(i)).partitionKey().getKey());
+ }
+ Txn txn = createTxn(wrapInTxn(inserts), binds);
+ FullRoute<?> route =
keys.toRoute(keys.get(0).toUnseekable());
+ return Pair.create(txn, route);
+ }
+ case Range:
+ {
+ Ranges ranges = Ranges.of(Gens.arrays(Range.class,
rangeGen).unique().ofSizeBetween(1, 5).next(rs));
+ Txn txn = createTxn(Txn.Kind.ExclusiveSyncPoint, ranges);
+ FullRangeRoute route = ranges.toRoute(ranges.get(0).end());
+ return Pair.create(txn, route);
+ }
+ default:
+ throw new UnsupportedOperationException(domain.name());
+ }
+ };
+ }
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
index 6e216ff56d..9ade467954 100644
---
a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
@@ -30,10 +30,14 @@ import accord.api.Key;
import accord.impl.basic.SimulatedFault;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
+import accord.messages.PreAccept;
+import accord.primitives.FullRoute;
import accord.primitives.Keys;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Seekables;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.RandomSource;
@@ -48,6 +52,7 @@ import
org.apache.cassandra.service.accord.SimulatedAccordCommandStoreTestBase;
import org.apache.cassandra.service.accord.TokenRange;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.Pair;
import org.assertj.core.api.Assertions;
import static accord.utils.Property.qt;
@@ -58,6 +63,7 @@ public class SimulatedAsyncOperationTest extends
SimulatedAccordCommandStoreTest
public void precondition()
{
Assertions.assertThat(intTbl.partitioner).isEqualTo(Murmur3Partitioner.instance);
+
Assertions.assertThat(reverseTokenTbl.partitioner).isEqualTo(Murmur3Partitioner.instance);
}
@Test
@@ -73,20 +79,22 @@ public class SimulatedAsyncOperationTest extends
SimulatedAccordCommandStoreTest
qt().withExamples(100).check(rs -> test(rs, 100, intTbl, actionGen));
}
+ enum Operation { Task, PreAccept }
+
private static void test(RandomSource rs, int numSamples, TableMetadata
tbl, Gen<Action> actionGen) throws Exception
{
AccordKeyspace.unsafeClear();
+ Gen<Operation> operationGen = Gens.enums().all(Operation.class);
int numKeys = rs.nextInt(20, 1000);
long minToken = 0;
long maxToken = numKeys;
Gen<Key> keyGen = Gens.longs().between(minToken + 1, maxToken).map(t
-> new PartitionKey(tbl.id,
tbl.partitioner.decorateKey(LongToken.keyForToken(t))));
-
-
Gen<Keys> keysGen = Gens.lists(keyGen).unique().ofSizeBetween(1,
10).map(l -> Keys.of(l));
Gen<Ranges> rangesGen = Gens.lists(rangeInsideRange(tbl.id, minToken,
maxToken)).uniqueBestEffort().ofSizeBetween(1, 10).map(l ->
Ranges.of(l.toArray(Range[]::new)));
Gen<Seekables<?, ?>> seekablesGen = Gens.oneOf(keysGen, rangesGen);
+ Gen<Pair<Txn, FullRoute<?>>> txnGen =
randomTxn(mixedDomainGen.next(rs), mixedTokenGen.next(rs));
try (var instance = new SimulatedAccordCommandStore(rs))
{
@@ -94,14 +102,46 @@ public class SimulatedAsyncOperationTest extends
SimulatedAccordCommandStoreTest
Counter counter = new Counter();
for (int i = 0; i < numSamples; i++)
{
- PreLoadContext ctx =
PreLoadContext.contextFor(seekablesGen.next(rs));
- operation(instance, ctx, actionGen.next(rs),
rs::nextBoolean).begin((ignore, failure) -> {
- counter.counter++;
- if (failure != null && !(failure instanceof
SimulatedFault)) throw new AssertionError("Unexpected error", failure);
- });
+ Operation op = operationGen.next(rs);
+ switch (op)
+ {
+ case Task:
+ {
+ PreLoadContext ctx =
PreLoadContext.contextFor(seekablesGen.next(rs));
+ instance.maybeCacheEvict(ctx.keys());
+ operation(instance, ctx, actionGen.next(rs),
rs::nextBoolean).begin(counter);
+ }
+ break;
+ case PreAccept:
+ {
+ Pair<Txn, FullRoute<?>> txnWithRoute = txnGen.next(rs);
+ Txn txn = txnWithRoute.left;
+ Action action = actionGen.next(rs);
+ TxnId txnId = instance.nextTxnId(txn.kind(),
txn.keys().domain());
+ FullRoute<?> route = txnWithRoute.right;
+ PreAccept preAccept = new PreAccept(nodeId,
instance.topologies, txnId, txn, route) {
+ @Override
+ public PreAcceptReply apply(SafeCommandStore
safeStore)
+ {
+ PreAcceptReply result = super.apply(safeStore);
+ if (action == Action.FAILURE)
+ throw new SimulatedFault("PreAccept failed
for keys " + keys());
+ return result;
+ }
+ };
+ instance.maybeCacheEvict(txn.keys());
+ instance.processAsync(preAccept).begin(counter);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(op.name());
+ }
}
instance.processAll();
Assertions.assertThat(counter.counter).isEqualTo(numSamples);
+ instance.store.cache().stream().forEach(e -> {
+ Assertions.assertThat(e.referenceCount()).isEqualTo(0);
+ });
}
}
@@ -146,9 +186,17 @@ public class SimulatedAsyncOperationTest extends
SimulatedAccordCommandStoreTest
};
}
- private static class Counter
+ private static class Counter implements BiConsumer<Object, Throwable>
{
int counter = 0;
+
+ @Override
+ public void accept(Object o, Throwable failure)
+ {
+ counter++;
+ if (failure != null && !(failure instanceof SimulatedFault))
+ throw new AssertionError("Unexpected error", failure);
+ }
}
private static class SimulatedOperation extends AsyncOperation<Void>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]