This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 72567e668c CEP-15: (Accord) Bootstraps LocalOnly txn can not be 
recreated from SerializerSupport
72567e668c is described below

commit 72567e668c8ba63c8b8316583542c0c7b4918cc6
Author: David Capwell <[email protected]>
AuthorDate: Fri May 31 10:37:27 2024 -0700

    CEP-15: (Accord) Bootstraps LocalOnly txn can not be recreated from 
SerializerSupport
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-19674
---
 modules/accord                                     |  2 +-
 .../apache/cassandra/schema/SchemaProvider.java    |  5 ++
 .../cassandra/service/accord/AccordJournal.java    | 28 ++++----
 .../cassandra/service/accord/AccordKeyspace.java   |  2 +-
 .../service/accord/AccordSafeCommandStore.java     |  6 ++
 .../accord/AccordSafeCommandsForRanges.java        | 57 ++-------------
 .../cassandra/service/accord/AccordService.java    |  5 +-
 .../service/accord/ImmutableAccordSafeState.java   | 84 ++++++++++++++++++++++
 .../cassandra/service/accord/api/PartitionKey.java |  5 +-
 .../service/accord/async/AsyncLoader.java          |  7 +-
 .../accord/SimulatedAccordCommandStore.java        | 24 ++++++-
 .../SimulatedAccordCommandStoreTestBase.java       | 71 ++++++++++++++++++
 .../accord/async/SimulatedAsyncOperationTest.java  | 64 ++++++++++++++---
 13 files changed, 278 insertions(+), 82 deletions(-)

diff --git a/modules/accord b/modules/accord
index 84e89bd91c..cf10169067 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 84e89bd91cf1b058fbf314b750336a1ec1096b18
+Subproject commit cf10169067a8cd40fb876789a62439cc03fd2e9b
diff --git a/src/java/org/apache/cassandra/schema/SchemaProvider.java 
b/src/java/org/apache/cassandra/schema/SchemaProvider.java
index 45b70ec2e7..a21e9adea3 100644
--- a/src/java/org/apache/cassandra/schema/SchemaProvider.java
+++ b/src/java/org/apache/cassandra/schema/SchemaProvider.java
@@ -141,6 +141,11 @@ public interface SchemaProvider
         return metadata == null ? null : metadata.partitioner;
     }
 
+    default IPartitioner getExistingTablePartitioner(TableId id) throws 
UnknownTableException
+    {
+        return getExistingTableMetadata(id).partitioner;
+    }
+
     @Nullable
     default TableMetadataRef getTableMetadataRef(TableId id)
     {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 0c31afbb4c..80cfdf31ea 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -40,6 +40,12 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.primitives.Ints;
+
+import accord.messages.ApplyThenWaitUntilApplied;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.LongArrayList;
+import org.agrona.collections.ObjectHashSet;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +55,6 @@ import accord.local.SerializerSupport;
 import accord.messages.AbstractEpochRequest;
 import accord.messages.Accept;
 import accord.messages.Apply;
-import accord.messages.ApplyThenWaitUntilApplied;
 import accord.messages.BeginRecovery;
 import accord.messages.Commit;
 import accord.messages.LocalRequest;
@@ -65,9 +70,6 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
-import org.agrona.collections.Long2ObjectHashMap;
-import org.agrona.collections.LongArrayList;
-import org.agrona.collections.ObjectHashSet;
 import org.apache.cassandra.concurrent.Interruptible;
 import org.apache.cassandra.concurrent.ManyToOneConcurrentLinkedQueue;
 import org.apache.cassandra.concurrent.SequentialExecutorPlus;
@@ -104,7 +106,6 @@ import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.concurrent.Semaphore;
 import org.apache.cassandra.utils.vint.VIntCoding;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.jctools.queues.SpscLinkedQueue;
 
 import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ;
@@ -256,11 +257,11 @@ public class AccordJournal implements IJournal, 
Shutdownable
     /**
      * Accord protocol messages originating from local node, e.g. Propagate.
      */
-    public void appendLocalRequest(LocalRequest<?> request)
+    public <R> void appendLocalRequest(LocalRequest<R> request, BiConsumer<? 
super R, Throwable> callback)
     {
         Type type = Type.fromMessageType(request.type());
         Key key = new Key(type.txnId(request), type);
-        journal.asyncWrite(key, request, SENTINEL_HOSTS, null);
+        journal.asyncWrite(key, request, SENTINEL_HOSTS, callback);
     }
 
     @VisibleForTesting
@@ -313,7 +314,7 @@ public class AccordJournal implements IJournal, Shutdownable
             if (key.type.isRemoteRequest())
                 frameAggregator.onWrite(RemoteRequestContext.create(((Request) 
value).waitForEpoch(), (ResponseContext) writeContext, pointer));
             else if (key.type.isLocalRequest())
-                
frameAggregator.onWrite(LocalRequestContext.create((LocalRequest<?>) value, 
pointer));
+                
frameAggregator.onWrite(LocalRequestContext.create((LocalRequest<?>) value, 
(BiConsumer<?, Throwable>) writeContext, pointer));
             else
                 frameApplicator.onWrite(pointer, size, (FrameContext) 
writeContext);
         }
@@ -404,9 +405,9 @@ public class AccordJournal implements IJournal, Shutdownable
             this.callback = callback;
         }
 
-        static LocalRequestContext create(LocalRequest<?> request, 
RecordPointer pointer)
+        static LocalRequestContext create(LocalRequest<?> request, 
BiConsumer<?, Throwable> callback, RecordPointer pointer)
         {
-            return new LocalRequestContext(request.waitForEpoch(), 
request.callback(), pointer);
+            return new LocalRequestContext(request.waitForEpoch(), callback, 
pointer);
         }
     }
 
@@ -1182,13 +1183,14 @@ public class AccordJournal implements IJournal, 
Shutdownable
 
         private void applyRequest(RecordPointer pointer, RequestContext 
context, long preAcceptTimeout)
         {
-            Request request = (Request) cachedRecords.remove(pointer);
-            Type type = Type.fromMessageType(request.type());
+            Message message = (Message) cachedRecords.remove(pointer);
+            Type type = Type.fromMessageType(message.type());
             if (type == Type.PRE_ACCEPT || type == Type.BEGIN_RECOVER)
                 context.preAcceptTimeout(preAcceptTimeout);
 
             if (type.isRemoteRequest())
             {
+                Request request = (Request) message;
                 RemoteRequestContext ctx = (RemoteRequestContext) context;
                 Id from = endpointMapper.mappedId(ctx.from());
                 request.process(node, from, ctx);
@@ -1199,7 +1201,7 @@ public class AccordJournal implements IJournal, 
Shutdownable
                 LocalRequestContext ctx = (LocalRequestContext) context;
                 // TODO (expected): Make Propagate PreAccept receive 
preAcceptTimeout and timestamps
                 //noinspection unchecked,rawtypes
-                ((LocalRequest) request).process(node, ctx.callback);
+                ((LocalRequest) message).process(node, ctx.callback);
             }
         }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index ab6701d6d6..2dd2b5d28d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -1238,7 +1238,7 @@ public class AccordKeyspace
             WaitingOnProvider waitingOn = deserializeWaitingOn(txnId, row);
             MessageProvider messages = commandStore.makeMessageProvider(txnId);
 
-            return 
SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs, 
status, executeAt, executeAtLeast, promised, accepted, waitingOn, messages);
+            return SerializerSupport.reconstruct(commandStore.agent(), 
commandStore.unsafeRangesForEpoch(), attrs, status, executeAt, executeAtLeast, 
promised, accepted, waitingOn, messages);
         }
         catch (Throwable t)
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index 32a6973b07..ca0413b0d3 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -344,4 +344,10 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
         if (commandsForRanges != null)
             commandsForRanges.postExecute();
     }
+
+    @Override
+    public String toString()
+    {
+        return "AccordSafeCommandStore(id=" + commandStore().id() + ")";
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForRanges.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForRanges.java
index 42fb0f6ef1..848df1d270 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForRanges.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForRanges.java
@@ -21,61 +21,25 @@ package org.apache.cassandra.service.accord;
 import java.util.NavigableMap;
 import java.util.Objects;
 
-import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.TxnId;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import org.apache.cassandra.utils.Pair;
 
-public class AccordSafeCommandsForRanges implements AccordSafeState<Range, 
CommandsForRanges>
+public class AccordSafeCommandsForRanges extends 
ImmutableAccordSafeState<Ranges, CommandsForRanges>
 {
     private final AsyncResult<Pair<CommandsForRangesLoader.Watcher, 
NavigableMap<TxnId, CommandsForRangesLoader.Summary>>> chain;
-    private final Ranges ranges;
-    private boolean invalidated;
-    private CommandsForRanges original, current;
 
     public AccordSafeCommandsForRanges(Ranges ranges, 
AsyncResult<Pair<CommandsForRangesLoader.Watcher, NavigableMap<TxnId, 
CommandsForRangesLoader.Summary>>> chain)
     {
-        this.ranges = ranges;
+        super(ranges);
         this.chain = chain;
     }
 
     public Ranges ranges()
     {
-        return ranges;
-    }
-
-    @Override
-    public CommandsForRanges current()
-    {
-        checkNotInvalidated();
-        return current;
-    }
-
-    @Override
-    public void invalidate()
-    {
-        invalidated = true;
-    }
-
-    @Override
-    public boolean invalidated()
-    {
-        return invalidated;
-    }
-
-    @Override
-    public void set(CommandsForRanges update)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public CommandsForRanges original()
-    {
-        checkNotInvalidated();
-        return original;
+        return key();
     }
 
     @Override
@@ -85,17 +49,11 @@ public class AccordSafeCommandsForRanges implements 
AccordSafeState<Range, Comma
         Pair<CommandsForRangesLoader.Watcher, NavigableMap<TxnId, 
CommandsForRangesLoader.Summary>> pair = AsyncChains.getUnchecked(chain);
         pair.left.close();
         pair.left.get().entrySet().forEach(e -> pair.right.put(e.getKey(), 
e.getValue()));
-        current = original = new CommandsForRanges(ranges, pair.right);
-    }
-
-    @Override
-    public void postExecute()
-    {
-        checkNotInvalidated();
+        original = new CommandsForRanges(key, pair.right);
     }
 
     @Override
-    public AccordCachingState<Range, CommandsForRanges> global()
+    public AccordCachingState<Ranges, CommandsForRanges> global()
     {
         throw new UnsupportedOperationException();
     }
@@ -106,13 +64,13 @@ public class AccordSafeCommandsForRanges implements 
AccordSafeState<Range, Comma
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         AccordSafeCommandsForRanges that = (AccordSafeCommandsForRanges) o;
-        return Objects.equals(original, that.original) && 
Objects.equals(current, that.current);
+        return Objects.equals(original, that.original);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(original, current);
+        return Objects.hash(original);
     }
 
     @Override
@@ -122,7 +80,6 @@ public class AccordSafeCommandsForRanges implements 
AccordSafeState<Range, Comma
                "chain=" + chain +
                ", invalidated=" + invalidated +
                ", original=" + original +
-               ", current=" + current +
                '}';
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 8e2778f394..8764addbf5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
@@ -562,11 +563,11 @@ public class AccordService implements IAccordService, 
Shutdownable
         }
     }
 
-    private void handleLocalRequest(LocalRequest<?> request, Node node)
+    private <R> void handleLocalRequest(LocalRequest<R> request, BiConsumer<? 
super R, Throwable> callback, Node node)
     {
         // currently, we only create LocalRequests that have side effects and 
need to be persisted
         Invariants.checkState(request.type().hasSideEffects());
-        journal.appendLocalRequest(request);
+        journal.appendLocalRequest(request, callback);
     }
 
     private static RequestTimeoutException newTimeout(TxnId txnId, Txn txn, 
ConsistencyLevel consistencyLevel)
diff --git 
a/src/java/org/apache/cassandra/service/accord/ImmutableAccordSafeState.java 
b/src/java/org/apache/cassandra/service/accord/ImmutableAccordSafeState.java
new file mode 100644
index 0000000000..850f6f7e8d
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/ImmutableAccordSafeState.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import javax.annotation.Nullable;
+
+public abstract class ImmutableAccordSafeState<K, V> implements 
AccordSafeState<K, V>
+{
+    protected final K key;
+    @Nullable
+    protected V original;
+    protected boolean invalidated;
+
+    protected ImmutableAccordSafeState(K key)
+    {
+        this.key = key;
+    }
+
+    @Override
+    public K key()
+    {
+        return key;
+    }
+
+    @Override
+    public V original()
+    {
+        checkNotInvalidated();
+        return original;
+    }
+
+    @Override
+    public V current()
+    {
+        checkNotInvalidated();
+        return original;
+    }
+
+    @Override
+    public void invalidate()
+    {
+        invalidated = true;
+    }
+
+    @Override
+    public boolean invalidated()
+    {
+        return invalidated;
+    }
+
+    @Override
+    public void set(V update)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void revert()
+    {
+        checkNotInvalidated();
+    }
+
+    @Override
+    public void postExecute()
+    {
+        checkNotInvalidated();
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java 
b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
index a42fcf57bf..71feb7d88e 100644
--- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -147,8 +148,8 @@ public final class PartitionKey extends AccordRoutableKey 
implements Key
         public PartitionKey deserialize(DataInputPlus in, int version) throws 
IOException
         {
             TableId tableId = TableId.deserialize(in);
-            TableMetadata metadata = 
Schema.instance.getExistingTableMetadata(tableId);
-            DecoratedKey key = 
metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
+            IPartitioner partitioner = 
Schema.instance.getExistingTablePartitioner(tableId);
+            DecoratedKey key = 
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
             return new PartitionKey(tableId, key);
         }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 9a26bd4be1..740cb3b5d1 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -22,7 +22,6 @@ import accord.local.CommandsForKey;
 import accord.local.KeyHistory;
 import accord.local.PreLoadContext;
 import accord.primitives.*;
-import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
@@ -196,7 +195,11 @@ public class AsyncLoader
 
     private AsyncChain<Set<? extends Key>> findOverlappingKeys(Ranges ranges)
     {
-        Invariants.checkArgument(!ranges.isEmpty());
+        if (ranges.isEmpty())
+        {
+            // During topology changes some shards may be included with empty 
ranges
+            return AsyncChains.success(Collections.emptySet());
+        }
 
         List<AsyncChain<Set<PartitionKey>>> chains = new 
ArrayList<>(ranges.size());
         for (Range range : ranges)
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index a0bb647c41..f7206d8331 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -47,6 +47,7 @@ import accord.primitives.Keys;
 import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.RoutableKey;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -88,6 +89,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
     public final AccordCommandStore store;
     public final Node.Id nodeId;
     public final Topology topology;
+    public final Topologies topologies;
     public final MockJournal journal;
     public final ScheduledExecutorPlus unorderedScheduled;
     public final List<String> evictions = new ArrayList<>();
@@ -189,6 +191,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
         });
 
         this.topology = 
AccordTopology.createAccordTopology(ClusterMetadata.current());
+        this.topologies = new 
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology);
         var rangesForEpoch = new 
CommandStores.RangesForEpoch(topology.epoch(), topology.ranges(), store);
         updateHolder.add(topology.epoch(), rangesForEpoch, topology.ranges());
         updateHolder.updateGlobal(topology.ranges());
@@ -209,6 +212,21 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
         return new TxnId(timeService.epoch(), timeService.now(), kind, domain, 
nodeId);
     }
 
+    public void maybeCacheEvict(Seekables<?, ?> keysOrRanges)
+    {
+        switch (keysOrRanges.domain())
+        {
+            case Key:
+                maybeCacheEvict((Keys) keysOrRanges, Ranges.EMPTY);
+                break;
+            case Range:
+                maybeCacheEvict(Keys.EMPTY, (Ranges) keysOrRanges);
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown domain: " + 
keysOrRanges.domain());
+        }
+    }
+
     public void maybeCacheEvict(Keys keys, Ranges ranges)
     {
         AccordStateCache cache = store.cache();
@@ -217,7 +235,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
             if (TxnId.class.equals(keyType))
             {
                 Command command = (Command) state.state().get();
-                if (command.known().definition.isKnown()
+                if (command != null && command.known().definition.isKnown()
                     && (command.partialTxn().keys().intersects(keys) || 
ranges.intersects(command.partialTxn().keys()))
                     && shouldEvict.getAsBoolean())
                     cache.maybeEvict(state);
@@ -322,7 +340,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
     public Pair<TxnId, AsyncResult<PreAccept.PreAcceptOk>> 
enqueuePreAccept(Txn txn, FullRoute<?> route)
     {
         TxnId txnId = nextTxnId(txn.kind(), txn.keys().domain());
-        PreAccept preAccept = new PreAccept(nodeId, new 
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology), txnId, txn, 
route);
+        PreAccept preAccept = new PreAccept(nodeId, topologies, txnId, txn, 
route);
         return Pair.create(txnId, processAsync(preAccept, safe -> {
             var reply = preAccept.apply(safe);
             Assertions.assertThat(reply.isOk()).isTrue();
@@ -334,7 +352,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
     {
         TxnId txnId = nextTxnId(txn.kind(), txn.keys().domain());
         Ballot ballot = Ballot.fromValues(timeService.epoch(), 
timeService.now(), nodeId);
-        BeginRecovery br = new BeginRecovery(nodeId, new 
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology), txnId, txn, 
route, ballot);
+        BeginRecovery br = new BeginRecovery(nodeId, topologies, txnId, txn, 
route, ballot);
 
         return Pair.create(txnId, processAsync(br, safe -> {
             var reply = br.apply(safe);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index 5aed34bc79..2c313566df 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -37,14 +37,19 @@ import accord.messages.BeginRecovery;
 import accord.messages.PreAccept;
 import accord.primitives.Ballot;
 import accord.primitives.Deps;
+import accord.primitives.FullRangeRoute;
 import accord.primitives.FullRoute;
 import accord.primitives.Keys;
 import accord.primitives.LatestDeps;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
+import accord.primitives.Routable;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import org.apache.cassandra.ServerTestUtils;
@@ -57,11 +62,13 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.Pair;
 import org.assertj.core.api.Assertions;
 
 import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
+import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
 
 public abstract class SimulatedAccordCommandStoreTestBase extends CQLTester
 {
@@ -78,6 +85,21 @@ public abstract class SimulatedAccordCommandStoreTestBase 
extends CQLTester
     protected enum DepsMessage
     {PreAccept, BeginRecovery, PreAcceptThenBeginRecovery}
 
+    protected static final Gen<Gen<Routable.Domain>> mixedDomainGen = 
Gens.enums().allMixedDistribution(Routable.Domain.class);
+    protected static final Gen<Gen.LongGen> mixedTokenGen = top -> {
+        switch (top.nextInt(0, 3))
+        {
+            case 0: // all
+                return rs -> rs.nextLong(Long.MIN_VALUE  + 1, Long.MAX_VALUE);
+            case 1: // small
+                return rs -> rs.nextLong(0, 100);
+            case 2: // medium
+                return rs -> rs.nextLong(0, Long.MAX_VALUE);
+            default:
+                throw new AssertionError();
+        }
+    };
+
     protected static TableMetadata intTbl, reverseTokenTbl;
     protected static Node.Id nodeId;
 
@@ -345,4 +367,53 @@ public abstract class SimulatedAccordCommandStoreTestBase 
extends CQLTester
                 
Assertions.assertThat(deps.keyDeps.txnIds(key)).describedAs("Txn %s for key 
%s", txnId, key).isEqualTo(keyConflicts.get(key));
         }
     }
+
+    protected static Gen<Pair<Txn, FullRoute<?>>> 
randomTxn(Gen<Routable.Domain> domainGen, Gen.LongGen tokenGen)
+    {
+        TableMetadata tbl = reverseTokenTbl;
+        Invariants.checkArgument(tbl.partitioner == 
Murmur3Partitioner.instance, "Only murmur partitioner is supported; given %s", 
tbl.partitioner.getClass());
+        Gen<PartitionKey> keyGen = rs -> new PartitionKey(tbl.id, 
tbl.partitioner.decorateKey(Murmur3Partitioner.LongToken.keyForToken(tokenGen.nextLong(rs))));
+        Gen<Range> rangeGen = rs -> {
+            long a = tokenGen.nextLong(rs);
+            long b = tokenGen.nextLong(rs);
+            while (a == b)
+                b = tokenGen.nextLong(rs);
+            if (a > b)
+            {
+                long tmp = a;
+                a = b;
+                b = tmp;
+            }
+            return tokenRange(tbl.id, a, b);
+        };
+        return rs -> {
+            Routable.Domain domain = domainGen.next(rs);
+            switch (domain)
+            {
+                case Key:
+                {
+                    Keys keys = 
Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 5).next(rs));
+                    List<String> inserts = new ArrayList<>(keys.size());
+                    List<Object> binds = new ArrayList<>(keys.size());
+                    for (int i = 0; i < keys.size(); i++)
+                    {
+                        inserts.add(String.format("INSERT INTO %s (pk) VALUES 
(?)", tbl));
+                        binds.add(((PartitionKey) 
keys.get(i)).partitionKey().getKey());
+                    }
+                    Txn txn = createTxn(wrapInTxn(inserts), binds);
+                    FullRoute<?> route = 
keys.toRoute(keys.get(0).toUnseekable());
+                    return Pair.create(txn, route);
+                }
+                case Range:
+                {
+                    Ranges ranges = Ranges.of(Gens.arrays(Range.class, 
rangeGen).unique().ofSizeBetween(1, 5).next(rs));
+                    Txn txn = createTxn(Txn.Kind.ExclusiveSyncPoint, ranges);
+                    FullRangeRoute route = ranges.toRoute(ranges.get(0).end());
+                    return Pair.create(txn, route);
+                }
+                default:
+                    throw new UnsupportedOperationException(domain.name());
+            }
+        };
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
 
b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
index 6e216ff56d..9ade467954 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
@@ -30,10 +30,14 @@ import accord.api.Key;
 import accord.impl.basic.SimulatedFault;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
+import accord.messages.PreAccept;
+import accord.primitives.FullRoute;
 import accord.primitives.Keys;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.Seekables;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
@@ -48,6 +52,7 @@ import 
org.apache.cassandra.service.accord.SimulatedAccordCommandStoreTestBase;
 import org.apache.cassandra.service.accord.TokenRange;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.Pair;
 import org.assertj.core.api.Assertions;
 
 import static accord.utils.Property.qt;
@@ -58,6 +63,7 @@ public class SimulatedAsyncOperationTest extends 
SimulatedAccordCommandStoreTest
     public void precondition()
     {
         
Assertions.assertThat(intTbl.partitioner).isEqualTo(Murmur3Partitioner.instance);
+        
Assertions.assertThat(reverseTokenTbl.partitioner).isEqualTo(Murmur3Partitioner.instance);
     }
 
     @Test
@@ -73,20 +79,22 @@ public class SimulatedAsyncOperationTest extends 
SimulatedAccordCommandStoreTest
         qt().withExamples(100).check(rs -> test(rs, 100, intTbl, actionGen));
     }
 
+    enum Operation { Task, PreAccept }
+
     private static void test(RandomSource rs, int numSamples, TableMetadata 
tbl, Gen<Action> actionGen) throws Exception
     {
         AccordKeyspace.unsafeClear();
+        Gen<Operation> operationGen = Gens.enums().all(Operation.class);
 
         int numKeys = rs.nextInt(20, 1000);
         long minToken = 0;
         long maxToken = numKeys;
 
         Gen<Key> keyGen = Gens.longs().between(minToken + 1, maxToken).map(t 
-> new PartitionKey(tbl.id, 
tbl.partitioner.decorateKey(LongToken.keyForToken(t))));
-
-
         Gen<Keys> keysGen = Gens.lists(keyGen).unique().ofSizeBetween(1, 
10).map(l -> Keys.of(l));
         Gen<Ranges> rangesGen = Gens.lists(rangeInsideRange(tbl.id, minToken, 
maxToken)).uniqueBestEffort().ofSizeBetween(1, 10).map(l -> 
Ranges.of(l.toArray(Range[]::new)));
         Gen<Seekables<?, ?>> seekablesGen = Gens.oneOf(keysGen, rangesGen);
+        Gen<Pair<Txn, FullRoute<?>>> txnGen = 
randomTxn(mixedDomainGen.next(rs), mixedTokenGen.next(rs));
 
         try (var instance = new SimulatedAccordCommandStore(rs))
         {
@@ -94,14 +102,46 @@ public class SimulatedAsyncOperationTest extends 
SimulatedAccordCommandStoreTest
             Counter counter = new Counter();
             for (int i = 0; i < numSamples; i++)
             {
-                PreLoadContext ctx = 
PreLoadContext.contextFor(seekablesGen.next(rs));
-                operation(instance, ctx, actionGen.next(rs), 
rs::nextBoolean).begin((ignore, failure) -> {
-                    counter.counter++;
-                    if (failure != null && !(failure instanceof 
SimulatedFault)) throw new AssertionError("Unexpected error", failure);
-                });
+                Operation op = operationGen.next(rs);
+                switch (op)
+                {
+                    case Task:
+                    {
+                        PreLoadContext ctx = 
PreLoadContext.contextFor(seekablesGen.next(rs));
+                        instance.maybeCacheEvict(ctx.keys());
+                        operation(instance, ctx, actionGen.next(rs), 
rs::nextBoolean).begin(counter);
+                    }
+                    break;
+                    case PreAccept:
+                    {
+                        Pair<Txn, FullRoute<?>> txnWithRoute = txnGen.next(rs);
+                        Txn txn = txnWithRoute.left;
+                        Action action = actionGen.next(rs);
+                        TxnId txnId = instance.nextTxnId(txn.kind(), 
txn.keys().domain());
+                        FullRoute<?> route = txnWithRoute.right;
+                        PreAccept preAccept = new PreAccept(nodeId, 
instance.topologies, txnId, txn, route) {
+                            @Override
+                            public PreAcceptReply apply(SafeCommandStore 
safeStore)
+                            {
+                                PreAcceptReply result = super.apply(safeStore);
+                                if (action == Action.FAILURE)
+                                    throw new SimulatedFault("PreAccept failed 
for keys " + keys());
+                                return result;
+                            }
+                        };
+                        instance.maybeCacheEvict(txn.keys());
+                        instance.processAsync(preAccept).begin(counter);
+                    }
+                    break;
+                    default:
+                        throw new UnsupportedOperationException(op.name());
+                }
             }
             instance.processAll();
             Assertions.assertThat(counter.counter).isEqualTo(numSamples);
+            instance.store.cache().stream().forEach(e -> {
+                Assertions.assertThat(e.referenceCount()).isEqualTo(0);
+            });
         }
     }
 
@@ -146,9 +186,17 @@ public class SimulatedAsyncOperationTest extends 
SimulatedAccordCommandStoreTest
         };
     }
 
-    private static class Counter
+    private static class Counter implements BiConsumer<Object, Throwable>
     {
         int counter = 0;
+
+        @Override
+        public void accept(Object o, Throwable failure)
+        {
+            counter++;
+            if (failure != null && !(failure instanceof SimulatedFault))
+                throw new AssertionError("Unexpected error", failure);
+        }
     }
 
     private static class SimulatedOperation extends AsyncOperation<Void>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to