This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 62c88b6fab Non-serial single partition reads on Accord
62c88b6fab is described below

commit 62c88b6fabcaebd4d206e6cbb58df7fd16a60e4a
Author: Ariel Weisberg <[email protected]>
AuthorDate: Tue Sep 24 16:03:34 2024 -0400

    Non-serial single partition reads on Accord
    
    Patch by Ariel Weisberg; Reviewed by Benedict Elliott Smith for 
CASSANDRA-19951
---
 .../cassandra/cql3/statements/CQL3CasRequest.java  |  14 ++-
 .../org/apache/cassandra/service/CASRequest.java   |   3 +-
 .../org/apache/cassandra/service/StorageProxy.java | 127 +++++++++++++++++----
 .../service/accord/AccordConfigurationService.java |   1 -
 .../cassandra/service/accord/IAccordService.java   |  11 +-
 .../cassandra/service/accord/txn/TxnData.java      |  11 ++
 .../cassandra/service/accord/txn/TxnDataName.java  |   5 +
 .../cassandra/service/accord/txn/TxnRead.java      |  11 +-
 .../service/consensus/TransactionalMode.java       |  54 ++++++---
 .../ConsensusMigrationMutationHelper.java          |   3 +
 .../migration/ConsensusMigrationState.java         |  23 ++--
 .../migration/TransactionalMigrationFromMode.java  |   5 +
 .../cassandra/tcm/transformations/AlterSchema.java |  19 ++-
 .../distributed/test/ShortReadProtectionTest.java  |  49 ++++----
 .../distributed/test/accord/AccordCQLTestBase.java |   2 +-
 .../test/accord/AccordInteroperabilityTest.java    |  76 +++++++++++-
 .../config/DatabaseDescriptorRefTest.java          |   1 +
 17 files changed, 318 insertions(+), 97 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 0cc5e73569..03438d2da8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.index.IndexRegistry;
-import org.apache.cassandra.schema.TableMetadata;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -52,11 +49,15 @@ import 
org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.CASRequest;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.accord.txn.TxnCondition;
@@ -69,6 +70,7 @@ import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.accord.txn.TxnUpdate;
 import org.apache.cassandra.service.accord.txn.TxnWrite;
 import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.TimeUUID;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -77,6 +79,7 @@ import static 
org.apache.cassandra.service.StorageProxy.ConsensusAttemptResult.R
 import static 
org.apache.cassandra.service.StorageProxy.ConsensusAttemptResult.casResult;
 import static 
org.apache.cassandra.service.accord.txn.TxnDataName.Kind.CAS_READ;
 import static 
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_protocol;
+import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata;
 
 
 /**
@@ -470,13 +473,14 @@ public class CQL3CasRequest implements CASRequest
     }
 
     @Override
-    public Txn toAccordTxn(ConsistencyLevel consistencyLevel, ConsistencyLevel 
commitConsistencyLevel, ClientState clientState, long nowInSecs)
+    public Txn toAccordTxn(ClusterMetadata cm, ConsistencyLevel 
consistencyLevel, ConsistencyLevel commitConsistencyLevel, ClientState 
clientState, long nowInSecs)
     {
         SinglePartitionReadCommand readCommand = readCommand(nowInSecs);
         Update update = createUpdate(clientState, commitConsistencyLevel);
         // If the write strategy is sending all writes through Accord there is 
no need to use the supplied consistency
         // level since Accord will manage reading safely
-        consistencyLevel = 
metadata.params.transactionalMode.readCLForStrategy(consistencyLevel);
+        TableParams tableParams = getTableMetadata(cm, metadata.id).params;
+        consistencyLevel = 
tableParams.transactionalMode.readCLForStrategy(tableParams.transactionalMigrationFrom,
 consistencyLevel, cm, metadata.id, readCommand.partitionKey().getToken());
         TxnRead read = TxnRead.createCasRead(readCommand, consistencyLevel);
         // In a CAS requesting only one key is supported and writes
         // can't be dependent on any data that is read (only conditions)
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java 
b/src/java/org/apache/cassandra/service/CASRequest.java
index c50f078a5a..b4ddbc1760 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.tcm.ClusterMetadata;
 
 import static org.apache.cassandra.service.StorageProxy.ConsensusAttemptResult;
 
@@ -50,7 +51,7 @@ public interface CASRequest
      */
     PartitionUpdate makeUpdates(FilteredPartition current, ClientState 
clientState, Ballot ballot) throws InvalidRequestException;
 
-    Txn toAccordTxn(ConsistencyLevel consistencyLevel, ConsistencyLevel 
commitConsistencyLevel, ClientState clientState, long nowInSecs);
+    Txn toAccordTxn(ClusterMetadata cm, ConsistencyLevel consistencyLevel, 
ConsistencyLevel commitConsistencyLevel, ClientState clientState, long 
nowInSecs);
 
     ConsensusAttemptResult toCasResult(TxnResult txnResult);
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 7274415c4b..4dac4a6d9e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+import java.util.function.IntPredicate;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -49,7 +50,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.primitives.Txn;
-import accord.utils.Invariants;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
@@ -128,10 +128,12 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.IAccordService;
 import org.apache.cassandra.service.accord.IAccordService.AsyncTxnResult;
 import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
 import org.apache.cassandra.service.accord.txn.TxnQuery;
 import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.service.accord.txn.TxnResult;
@@ -139,6 +141,7 @@ import 
org.apache.cassandra.service.consensus.TransactionalMode;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutations;
 import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
+import 
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.ContentionStrategy;
@@ -172,6 +175,7 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 import static accord.primitives.Txn.Kind.EphemeralRead;
 import static accord.primitives.Txn.Kind.Read;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.concat;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -200,6 +204,7 @@ import static 
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_p
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.mutateWithAccordAsync;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.splitMutationsIntoAccordAndNormal;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.ConsensusRoutingDecision;
+import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata;
 import static org.apache.cassandra.service.paxos.Ballot.Flag.GLOBAL;
 import static org.apache.cassandra.service.paxos.Ballot.Flag.LOCAL;
 import static 
org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallot;
@@ -345,7 +350,6 @@ public class StorageProxy implements StorageProxyMBean
                                   Dispatcher.RequestTime requestTime)
     throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException, 
CasWriteUnknownResultException
     {
-        TableMetadata metadata = Schema.instance.validateTable(keyspaceName, 
cfName);
         if (DatabaseDescriptor.getPartitionDenylistEnabled() && 
DatabaseDescriptor.getDenylistWritesEnabled() && 
!partitionDenylist.isKeyPermitted(keyspaceName, cfName, key.getKey()))
         {
             denylistMetrics.incrementWritesRejected();
@@ -356,6 +360,8 @@ public class StorageProxy implements StorageProxyMBean
         ConsensusAttemptResult lastAttemptResult;
         do
         {
+            ClusterMetadata cm = ClusterMetadata.current();
+            TableMetadata metadata = 
Schema.instance.validateTable(keyspaceName, cfName);
             ConsensusRoutingDecision decision = consensusRouting(metadata, 
key, consistencyForPaxos, requestTime, true);
             switch (decision)
             {
@@ -378,10 +384,11 @@ public class StorageProxy implements StorageProxyMBean
                                                   requestTime);
                     break;
                 case accord:
-                    Txn txn = request.toAccordTxn(consistencyForPaxos,
-                                                        consistencyForCommit,
-                                                        clientState,
-                                                        nowInSeconds);
+                    Txn txn = request.toAccordTxn(cm,
+                                                  consistencyForPaxos,
+                                                  consistencyForCommit,
+                                                  clientState,
+                                                  nowInSeconds);
                     IAccordService accordService = AccordService.instance();
                     TxnResult txnResult = accordService.coordinate(txn,
                                                                    
consistencyForPaxos,
@@ -2121,6 +2128,7 @@ public class StorageProxy implements StorageProxyMBean
         ConsensusAttemptResult lastResult;
         do
         {
+            ClusterMetadata cm = ClusterMetadata.current();
             SinglePartitionReadCommand command = group.queries.get(0);
             ConsensusRoutingDecision decision = 
consensusRouting(group.metadata(), command.partitionKey(), consistencyLevel, 
requestTime, false);
             switch (decision)
@@ -2132,7 +2140,7 @@ public class StorageProxy implements StorageProxyMBean
                     lastResult = legacyReadWithPaxos(group, consistencyLevel, 
requestTime);
                     break;
                 case accord:
-                    lastResult = readWithAccord(group, consistencyLevel, 
requestTime);
+                    lastResult = readWithAccord(cm, group, consistencyLevel, 
requestTime);
                     break;
                 default:
                     throw new IllegalStateException("Unsupported consensus " + 
decision);
@@ -2141,28 +2149,79 @@ public class StorageProxy implements StorageProxyMBean
         return lastResult.serialReadResult;
     }
 
-    private static ConsensusAttemptResult 
readWithAccord(SinglePartitionReadCommand.Group group, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
+    private static ConsistencyLevel 
consistencyLevelForAccordRead(ClusterMetadata cm, 
SinglePartitionReadCommand.Group group, @Nullable ConsistencyLevel 
consistencyLevel)
     {
-        if (group.queries.size() > 1)
-            throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency 
may only be requested for one partition at a time");
-        SinglePartitionReadCommand readCommand = group.queries.get(0);
+        // Null means no specific consistency behavior is required from 
Accord, it's functionally similar to
+        // reading at ONE if you are reading data that wasn't written via 
Accord
+        if (consistencyLevel == null)
+            return null;
+
+        TableId tableId = group.queries.get(0).metadata().id;
+        TableParams tableParams = getTableMetadata(cm, tableId).params;
+        TransactionalMode mode = tableParams.transactionalMode;
+        TransactionalMigrationFromMode migrationFromMode = 
tableParams.transactionalMigrationFrom;
+        for (SinglePartitionReadCommand command : group.queries)
+        {
+            // readCLForStrategy should return either null or the supplied 
consistency level
+            // in which case we will read everything at that CL since Accord 
doesn't support per table
+            // read consistency
+            ConsistencyLevel commitCL = 
mode.readCLForStrategy(migrationFromMode, consistencyLevel, cm, tableId, 
command.partitionKey().getToken());
+            if (commitCL != null)
+                return commitCL;
+        }
+        return null;
+    }
+
+    private static ConsensusAttemptResult readWithAccord(ClusterMetadata cm, 
SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
+    {
+        if (consistencyLevel != null && 
!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
+            throw new InvalidRequestException(consistencyLevel + " is not 
supported by Accord");
         // If the non-SERIAL write strategy is sending all writes through 
Accord there is no need to use the supplied consistency
         // level since Accord will manage reading safely
         TransactionalMode transactionalMode = 
group.metadata().params.transactionalMode;
-        consistencyLevel = 
transactionalMode.readCLForStrategy(consistencyLevel);
-        TxnRead read = TxnRead.createSerialRead(readCommand, consistencyLevel);
-        Invariants.checkState(read.keys().size() == 1, "Ephemeral reads are 
only strict-serializable for single partition reads");
-        Txn txn = new Txn.InMemory(transactionalMode == TransactionalMode.full 
&& DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() ? EphemeralRead : 
Read, read.keys(), read, TxnQuery.ALL, null);
-        IAccordService accordService = AccordService.instance();
-        TxnResult txnResult = accordService.coordinate(txn, consistencyLevel, 
requestTime);
+        consistencyLevel = consistencyLevelForAccordRead(cm, group, 
consistencyLevel);
+        TxnRead read = TxnRead.createSerialRead(group.queries, 
consistencyLevel);
+        Txn.Kind kind = Read;
+        if (transactionalMode == TransactionalMode.full && 
DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() && 
group.queries.size() == 1)
+            kind = EphemeralRead;
+        Txn txn = new Txn.InMemory(kind, read.keys(), read, TxnQuery.ALL, 
null);
+        AsyncTxnResult asyncTxnResult = 
AccordService.instance().coordinateAsync(txn, consistencyLevel, requestTime);
+        return getConsensusAttemptResultFromAsyncTxnResult(asyncTxnResult, 
group.queries.size(), index -> group.queries.get(index).isReversed(), 
consistencyLevel, requestTime);
+    }
+
+    /*
+     * Used for both the SERIAL and non-SERIAL read path into Accord
+     */
+    public static ConsensusAttemptResult 
getConsensusAttemptResultFromAsyncTxnResult(AsyncTxnResult asyncTxnResult, int 
numQueries, IntPredicate isQueryReversed, ConsistencyLevel cl, 
Dispatcher.RequestTime requestTime)
+    {
+        TxnResult txnResult = 
AccordService.instance().getTxnResult(asyncTxnResult, false, cl, requestTime);
+        // TODO (required): Converge on a single approach to 
RETRY_NEW_PROTOCOL, this works for now because reads don't support it anyways
         if (txnResult.kind() == retry_new_protocol)
             return RETRY_NEW_PROTOCOL;
-        TxnData data = (TxnData)txnResult;
-        FilteredPartition partition = data.get(TxnRead.SERIAL_READ);
-        if (partition != null)
-            return 
serialReadResult(PartitionIterators.singletonIterator(partition.rowIterator(readCommand.isReversed())));
-        else
+        TxnData data = (TxnData) txnResult;
+
+        if (data.isEmpty())
+        {
             return serialReadResult(EmptyIterators.partition());
+        }
+        else if (data.size() == 1)
+        {
+            FilteredPartition value = data.values().iterator().next();
+            return 
serialReadResult(PartitionIterators.singletonIterator(value.rowIterator(isQueryReversed.test(0))));
+        }
+        else
+        {
+            // TODO (review): 95% sure this isn't actually needed and the 
consumer is going consume these by DecoratedKey not iteration order, but the 
non-transactional path does preserve the order of the iterators
+            List<PartitionIterator> partitionIterators = new 
ArrayList<>(numQueries);
+            for (int i = 0; i < numQueries; i++)
+                partitionIterators.add(null);
+            for (Map.Entry<TxnDataName, FilteredPartition> e : data.entrySet())
+            {
+                int queryIndex = Integer.valueOf(e.getKey().part(0));
+                partitionIterators.set(queryIndex, 
PartitionIterators.singletonIterator(e.getValue().rowIterator(isQueryReversed.test(queryIndex))));
+            }
+            return serialReadResult(partitionIterators.size() == 1 ? 
partitionIterators.get(0) : PartitionIterators.concat(partitionIterators));
+        }
     }
 
     private static ConsensusAttemptResult 
legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
@@ -2267,7 +2326,29 @@ public class StorageProxy implements StorageProxyMBean
         long start = nanoTime();
         try
         {
-            PartitionIterator result = fetchRows(group.queries, 
consistencyLevel, coordinator, requestTime);
+            ClusterMetadata cm = ClusterMetadata.current();
+            TableId tableId = group.queries.get(0).metadata().id;
+            // Returns null for local tables
+            TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+            if (tableMetadata == null)
+                tableMetadata = 
Schema.instance.localKeyspaces().getTableOrViewNullable(tableId);
+            TableParams tableParams = tableMetadata.params;
+
+            TransactionalMode transactionalMode = 
tableParams.transactionalMode;
+//            TransactionalMigrationFromMode transactionalMigrationFromMode = 
tableParams.transactionalMigrationFrom;
+            // TODO (required): Tests would fail with this and we need to add 
live migration support anyways so for now allow it
+//            if (transactionalMigrationFromMode != 
TransactionalMigrationFromMode.none)
+//                throw new UnsupportedOperationException("Live migration is 
not supported, can't safely read when migrating from " + 
transactionalMigrationFromMode + " to " + transactionalMode);
+
+            PartitionIterator result;
+            if (transactionalMode.readsThroughAccord && 
coordinator.isEventuallyConsistent())
+            {
+                ConsensusAttemptResult consensusAttemptResult = 
readWithAccord(cm, group, consistencyLevel, requestTime);
+                
checkState(!consensusAttemptResult.shouldRetryOnNewConsensusProtocol, "Live 
migration is not supported with non-SERIAL reads yet");
+                result = consensusAttemptResult.serialReadResult;
+            }
+            else
+                result = fetchRows(group.queries, consistencyLevel, 
coordinator, requestTime);
             // Note that the only difference between the command in a group 
must be the partition key on which
             // they applied.
             boolean enforceStrictLiveness = 
group.queries.get(0).metadata().enforceStrictLiveness();
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 4f6f3d1666..08461249c3 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java 
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index bba67a9160..97ebae6963 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -18,16 +18,15 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableSet;
-
 import accord.api.BarrierType;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.DurableBefore;
@@ -59,15 +58,13 @@ import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 
-import java.util.Collection;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 
 public interface IAccordService
 {
-    Set<ConsistencyLevel> SUPPORTED_COMMIT_CONSISTENCY_LEVELS = 
ImmutableSet.of(ConsistencyLevel.ANY, ConsistencyLevel.ONE, 
ConsistencyLevel.LOCAL_ONE, ConsistencyLevel.QUORUM, ConsistencyLevel.SERIAL, 
ConsistencyLevel.ALL);
-    Set<ConsistencyLevel> SUPPORTED_READ_CONSISTENCY_LEVELS = 
ImmutableSet.of(ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, 
ConsistencyLevel.SERIAL);
+    EnumSet<ConsistencyLevel> SUPPORTED_COMMIT_CONSISTENCY_LEVELS = 
EnumSet.of(ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, 
ConsistencyLevel.SERIAL, ConsistencyLevel.ALL);
+    EnumSet<ConsistencyLevel> SUPPORTED_READ_CONSISTENCY_LEVELS = 
EnumSet.of(ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, 
ConsistencyLevel.SERIAL);
 
     IVerbHandler<? extends Request> requestHandler();
     IVerbHandler<? extends Reply> responseHandler();
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index 9c2ae88f83..6ca4fc5083 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -80,6 +81,16 @@ public class TxnData extends TxnResult implements Data, 
Iterable<FilteredPartiti
         return data.entrySet();
     }
 
+    public Collection<FilteredPartition> values()
+    {
+        return data.values();
+    }
+
+    public int size()
+    {
+        return data.size();
+    }
+
     public boolean isEmpty()
     {
         return data.isEmpty();
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
index 4f3edbff0b..5625473857 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
@@ -144,6 +144,11 @@ public class TxnDataName implements Comparable<TxnDataName>
         return Collections.unmodifiableList(Arrays.asList(parts));
     }
 
+    public String part(int index)
+    {
+        return parts[index];
+    }
+
     public DecoratedKey getDecoratedKey(TableMetadata metadata)
     {
         checkKind(Kind.AUTO_READ);
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 694c3f225a..391da765bf 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -36,10 +36,10 @@ import accord.primitives.Participants;
 import accord.primitives.Ranges;
 import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -100,10 +100,13 @@ public class TxnRead extends 
AbstractKeySorted<TxnNamedRead> implements Read
         return new TxnRead(items, txnKeys, consistencyLevel);
     }
 
-    public static TxnRead createSerialRead(SinglePartitionReadCommand 
readCommand, ConsistencyLevel consistencyLevel)
+    public static TxnRead createSerialRead(List<SinglePartitionReadCommand> 
readCommands, ConsistencyLevel consistencyLevel)
     {
-        TxnNamedRead read = new TxnNamedRead(SERIAL_READ, readCommand);
-        return new TxnRead(ImmutableList.of(read), Keys.of(read.key()), 
consistencyLevel);
+        List<TxnNamedRead> reads = new ArrayList<>(readCommands.size());
+        for (int i = 0; i < readCommands.size(); i++)
+            reads.add(new TxnNamedRead(TxnDataName.user(String.valueOf(i)), 
readCommands.get(i)));
+        Keys keys = Keys.of(reads, TxnNamedRead::key);
+        return new TxnRead(reads, keys, consistencyLevel);
     }
 
     public static TxnRead createCasRead(SinglePartitionReadCommand 
readCommand, ConsistencyLevel consistencyLevel)
diff --git 
a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java 
b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
index 24a5ba5821..67f9e30b91 100644
--- a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
+++ b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
@@ -19,7 +19,15 @@
 package org.apache.cassandra.service.consensus;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.IAccordService;
+import org.apache.cassandra.service.consensus.migration.TableMigrationState;
+import 
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+import static com.google.common.base.Preconditions.checkState;
 
 /*
  * Configure the transactional behavior of a table. Enables accord on a table 
and defines how it mixes with non-serial writes
@@ -57,53 +65,54 @@ import org.apache.cassandra.service.accord.IAccordService;
 public enum TransactionalMode
 {
     // Running on Paxos V1 or V2 with Accord disabled
-    off(false, false, false, false),
+    off(false, false, false, false, false),
 
     /*
      * Execute writes through Cassandra via StorageProxy's normal write path. 
This can lead Accord to compute
      * multiple outcomes for a transaction that depends on data written by 
non-SERIAL writes.
      */
-    unsafe(true, false, false, false),
+    unsafe(true, false, false, false, false),
 
     /*
      * Allow mixing of non-SERIAL writes and Accord, but still force BRR 
through Accord.
      * This mode makes it safe to perform non-SERIAL or SERIAL reads of Accord 
data, but unsafe
      * to write data that Accord may attempt to read.
      */
-    unsafe_writes(true, false, false, true),
+    unsafe_writes(true, false, false, false, true),
 
     /*
      * Execute writes through Accord skipping StorageProxy's normal write 
path, but commit
      * writes at the provided consistency level so they can be read via 
non-SERIAL consistency levels.
      * This mode makes it safe to read/write data that Accord will read/write.
      */
-    mixed_reads(true, false, true, true),
+    mixed_reads(true, false, true, false, true),
 
     /*
      * Execute writes through Accord skipping StorageProxy's normal write 
path. Ignores the provided consistency level
      * which makes Accord commit writes at ANY similar to Paxos with commit 
consistency level ANY.
      */
-    full(true, true, true, true);
+    full(true, true, true, true, true);
 
     public final boolean accordIsEnabled;
-    public final boolean ignoresSuppliedConsistencyLevel;
+    public final boolean ignoresSuppleidCommitCL;
     public final boolean writesThroughAccord;
-
+    public final boolean readsThroughAccord;
     public final boolean blockingReadRepairThroughAccord;
     private final String cqlParam;
 
-    TransactionalMode(boolean accordIsEnabled, boolean 
ignoresSuppliedConsistencyLevel, boolean writesThroughAccord, boolean 
blockingReadRepairThroughAccord)
+    TransactionalMode(boolean accordIsEnabled, boolean 
ignoresSuppleidCommitCL, boolean writesThroughAccord, boolean 
readsThroughAccord, boolean blockingReadRepairThroughAccord)
     {
         this.accordIsEnabled = accordIsEnabled;
-        this.ignoresSuppliedConsistencyLevel = ignoresSuppliedConsistencyLevel;
+        this.ignoresSuppleidCommitCL = ignoresSuppleidCommitCL;
         this.writesThroughAccord = writesThroughAccord;
+        this.readsThroughAccord = readsThroughAccord;
         this.blockingReadRepairThroughAccord = blockingReadRepairThroughAccord;
         this.cqlParam = String.format("transactional_mode = '%s'", 
this.name().toLowerCase());
     }
 
     public ConsistencyLevel commitCLForStrategy(ConsistencyLevel 
consistencyLevel)
     {
-        if (ignoresSuppliedConsistencyLevel)
+        if (ignoresSuppleidCommitCL)
             return null;
 
         if 
(!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
@@ -112,14 +121,25 @@ public enum TransactionalMode
         return consistencyLevel;
     }
 
-    // TODO (required): This won't work for migration directly from none to 
full because there is no safe system to read from
-    // during the first phase (repair). Accord won't read correctly beacuse it 
won't honor the CL and miss non-transactional writes that haven't been repaired 
and non-transactional
-    // reads will miss all the writes being routed through Accord since they 
occur asynchronously. Something has to give here where either writes routed 
through are Accord are synchronous at CL
-    // or reads are routed through Accord and read at quorum as long as the 
range has not completed the first phase (repair).
-    public ConsistencyLevel readCLForStrategy(ConsistencyLevel 
consistencyLevel)
+    private boolean ignoresSuppliedReadCL()
     {
-        if (ignoresSuppliedConsistencyLevel)
-            return null;
+        return writesThroughAccord && blockingReadRepairThroughAccord;
+    }
+
+    public ConsistencyLevel readCLForStrategy(TransactionalMigrationFromMode 
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId 
tableId, Token token)
+    {
+        if (ignoresSuppliedReadCL())
+        {
+            TableMigrationState tms = 
cm.consensusMigrationState.tableStates.get(tableId);
+            checkState(tms != null || fromMode == 
TransactionalMigrationFromMode.none);
+
+            // Only ignore the supplied consistency level if the token is not 
migrating
+            // otherwise honor it because we might read through Accord for 
non-SERIAL reads before repair is run
+            // this is OK to do because BRR still works and Accord isn't 
computing a write so recovery
+            // determinism isn't an issue
+            if (tms == null || Range.isInNormalizedRanges(token, 
tms.migratedRanges))
+                return null;
+        }
 
         if 
(!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
             throw new UnsupportedOperationException("Consistency level " + 
consistencyLevel + " is unsupported with Accord for read, supported are ONE, 
QUORUM, and SERIAL");
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
index 32997bfb0c..b3d7422090 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -235,6 +236,8 @@ public class ConsensusMigrationMutationHelper
 
     public static AsyncTxnResult mutateWithAccordAsync(ClusterMetadata cm, 
Collection<? extends IMutation> mutations, @Nullable ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
     {
+        if (consistencyLevel != null && 
!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
+            throw new InvalidRequestException(consistencyLevel + " is not 
supported by Accord");
         int fragmentIndex = 0;
         List<TxnWrite.Fragment> fragments = new ArrayList<>(mutations.size());
         List<PartitionKey> partitionKeys = new ArrayList<>(mutations.size());
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
index 731aabb735..b2d1195b1a 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.utils.PojoToString;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap;
 import static org.apache.cassandra.utils.CollectionSerializers.newHashMap;
 import static org.apache.cassandra.utils.CollectionSerializers.serializeMap;
@@ -136,16 +137,20 @@ public class ConsensusMigrationState implements 
MetadataValue<ConsensusMigration
 
     private static void withRangesMigrating(Map<TableId, TableMigrationState> 
current, ImmutableMap.Builder<TableId, TableMigrationState> next, TableMetadata 
metadata, List<Range<Token>> ranges, boolean overwrite)
     {
-        TableMigrationState tableState;
+        TableMigrationState tableState = current.get(metadata.id);
+        checkState(tableState != null || overwrite, "Can't begin migrating a 
table without first altering the schema to set transactional mode");
+        TransactionalMigrationFromMode migrationFromMode = 
metadata.params.transactionalMigrationFrom;
         ConsensusMigrationTarget target = 
ConsensusMigrationTarget.fromTransactionalMode(metadata.params.transactionalMode);
-        if (!overwrite && current.containsKey(metadata.id))
-        {
-            tableState = current.get(metadata.id).withRangesMigrating(ranges, 
target);
-        }
+        checkState(migrationFromMode != null && migrationFromMode != 
TransactionalMigrationFromMode.none, "Table transactional migration from can't 
be null or none");
+
+        Map<Epoch, List<Range<Token>>> migratingRangesByEpoch = 
ImmutableMap.of();
+        if (!ranges.isEmpty())
+            ImmutableMap.of(Epoch.EMPTY, ranges);
+
+        if (overwrite)
+            tableState = new TableMigrationState(metadata.keyspace, 
metadata.name, metadata.id, target, ImmutableSet.of(), migratingRangesByEpoch);
         else
-        {
-            tableState = new TableMigrationState(metadata.keyspace, 
metadata.name, metadata.id, target, ImmutableSet.of(), 
ImmutableMap.of(Epoch.EMPTY, ranges));
-        }
+            tableState = tableState.withRangesMigrating(ranges, target);
         next.put(metadata.id, tableState);
     }
 
@@ -238,7 +243,7 @@ public class ConsensusMigrationState implements 
MetadataValue<ConsensusMigration
     {
         tableStates.forEach((id, migrationState) -> {
             TableMetadata metadata = schema.getTableMetadata(id);
-            
Preconditions.checkState(ConsensusMigrationTarget.fromTransactionalMode(metadata.params.transactionalMode).equals(migrationState.targetProtocol));
+            
checkState(ConsensusMigrationTarget.fromTransactionalMode(metadata.params.transactionalMode).equals(migrationState.targetProtocol));
         });
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java
index f916a96b8e..5f88a78761 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java
@@ -76,6 +76,11 @@ public enum TransactionalMigrationFromMode
         return from != null && from.writesThroughAccord;
     }
 
+    public boolean readsThroughAccord()
+    {
+        return from != null && from.writesThroughAccord;
+    }
+
     public boolean isMigrating()
     {
         return this != none;
diff --git a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java 
b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
index c97c6e3be2..8c13631122 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
@@ -19,17 +19,23 @@
 package org.apache.cassandra.tcm.transformations;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import com.google.common.collect.Streams;
-import org.apache.cassandra.config.AccordSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.AccordSpec;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -63,6 +69,7 @@ import 
org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
 import static org.apache.cassandra.exceptions.ExceptionCode.ALREADY_EXISTS;
 import static org.apache.cassandra.exceptions.ExceptionCode.CONFIG_ERROR;
@@ -285,6 +292,9 @@ public class AlterSchema implements Transformation
                 .map(alt -> alt.after)
                 .collect(Collectors.toUnmodifiableSet());
 
+        Set<TableId> startedAndReversed = 
Sets.intersection(started.stream().map(TableMetadata::id).collect(Collectors.toSet()),
 reversals.keySet());
+        checkState(startedAndReversed.isEmpty(), "Set of tables starting 
migration and reversing migration should not intersect");
+
         if (!started.isEmpty())
         {
             List<Range<Token>> ranges;
@@ -301,8 +311,9 @@ public class AlterSchema implements Transformation
                     break;
             }
 
-            if (!ranges.isEmpty())
-                migrationState = migrationState.withRangesMigrating(started, 
ranges, true);
+            // Always create the migration state even if nothing is currently 
migrating, the empty state
+            // signals that a migration is in progress with no migrating 
ranges and corresponds to transactionalMigrationFrom != none
+            migrationState = migrationState.withRangesMigrating(started, 
ranges, true);
         }
 
         migrationState = migrationState.withReversedMigrations(reversals, 
next.epoch());
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
index 4d2ae2df7a..c9ab6b30cc 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
@@ -120,12 +120,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Before
     public void setupTester()
     {
-        tester = new Tester(readConsistencyLevel, flush, paging);
-    }
-
-    private String transactionalModeCQL()
-    {
-        return " WITH transactional_mode='" + transactionalMode + '\'';
+        tester = new Tester(readConsistencyLevel, flush, paging, 
transactionalMode);
     }
 
     @After
@@ -144,7 +139,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testSkinnyTableWithoutLiveRows()
     {
-        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)" + 
transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
               .allNodes("INSERT INTO %s (id) VALUES (0) USING TIMESTAMP 0")
               .toNode1("DELETE FROM %s WHERE id = 0")
               .assertRows("SELECT DISTINCT id FROM %s WHERE id = 0")
@@ -161,7 +156,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testSkinnyTableWithLiveRows()
     {
-        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)" + 
transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
               .allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d) 
USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
               .toNode1("DELETE FROM %s WHERE id IN (1, 0, 4, 6, 3)") // delete 
every other row
               .assertRows("SELECT DISTINCT token(id), id FROM %s",
@@ -178,7 +173,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testSkinnyTableWithComplementaryDeletions()
     {
-        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)" + 
transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
               .allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d) 
USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
               .toNode1("DELETE FROM %s WHERE id IN (5, 8, 2, 7, 9)") // delete 
every other row
               .toNode2("DELETE FROM %s WHERE id IN (1, 0, 4, 6)") // delete 
every other row but the last one
@@ -196,7 +191,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testMultipleMissedRows()
     {
-        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))")
               .allNodes(0, 4, i -> format("INSERT INTO %%s (pk, ck) VALUES (0, 
%d) USING TIMESTAMP 0", i))
               .toNode1("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2, 3)",
                        "INSERT INTO %s (pk, ck) VALUES (0, 5)")
@@ -215,7 +210,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testAscendingOrder()
     {
-        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY 
KEY(k, c))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY 
KEY(k, c))")
               .allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES 
(0, %d, %d) USING TIMESTAMP 0", i, i * 10))
               .toNode1("DELETE FROM %s WHERE k=0 AND c=1")
               .toNode2("DELETE FROM %s WHERE k=0 AND c=2")
@@ -237,7 +232,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testDescendingOrder()
     {
-        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY 
KEY(k, c))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY 
KEY(k, c))")
               .allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES 
(0, %d, %d) USING TIMESTAMP 0", i, i * 10))
               .toNode1("DELETE FROM %s WHERE k=0 AND c=7")
               .toNode2("DELETE FROM %s WHERE k=0 AND c=8")
@@ -260,7 +255,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testDeletePartition()
     {
-        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY 
KEY(k, c))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY 
KEY(k, c))")
               .allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING 
TIMESTAMP 0",
                         "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING 
TIMESTAMP 0")
               .toNode2("DELETE FROM %s WHERE k=0")
@@ -273,7 +268,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testDeletePartitionWithStatic()
     {
-        tester.createTable("CREATE TABLE %s (k int, c int, v int, s int 
STATIC, PRIMARY KEY(k, c))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, s int 
STATIC, PRIMARY KEY(k, c))")
               .allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100) 
USING TIMESTAMP 0",
                         "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING 
TIMESTAMP 0")
               .toNode2("DELETE FROM %s WHERE k=0")
@@ -286,7 +281,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testDeleteClustering()
     {
-        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY 
KEY(k, c))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY 
KEY(k, c))")
               .allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING 
TIMESTAMP 0",
                         "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING 
TIMESTAMP 0")
               .toNode2("DELETE FROM %s WHERE k=0 AND c=1")
@@ -301,7 +296,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testDeleteClusteringWithStatic()
     {
-        tester.createTable("CREATE TABLE %s (k int, c int, v int, s int 
STATIC, PRIMARY KEY(k, c))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, s int 
STATIC, PRIMARY KEY(k, c))")
               .allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100) 
USING TIMESTAMP 0",
                         "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING 
TIMESTAMP 0")
               .toNode2("DELETE FROM %s WHERE k=0 AND c=1")
@@ -318,7 +313,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testGroupByRegularRow()
     {
-        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))")
               .toNode1("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP 
0",
                        "DELETE FROM %s WHERE pk=0 AND ck=0",
                        "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 
0")
@@ -341,7 +336,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testGroupByStaticRow()
     {
-        tester.createTable("CREATE TABLE %s (pk int, ck int, s int static, 
PRIMARY KEY (pk, ck))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (pk int, ck int, s int static, 
PRIMARY KEY (pk, ck))")
               .toNode1("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP 
0",
                        "INSERT INTO %s (pk, s) VALUES (0, null)",
                        "INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP 
0")
@@ -364,7 +359,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testSkipEarlyTermination()
     {
-        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))")
               .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0)")
               .toNode2("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2)")
               .assertRows("SELECT DISTINCT pk FROM %s", row(0));
@@ -381,7 +376,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testSkipEarlyTerminationRows()
     {
-        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))")
               .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 
0",
                        "INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP 
0",
                        "INSERT INTO %s (pk, ck) VALUES (2, 0) USING TIMESTAMP 
0",
@@ -405,7 +400,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
     @Test
     public void testSkipEarlyTerminationPartitions()
     {
-        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))" + transactionalModeCQL())
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))")
               .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 
0",
                        "INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP 
0",
                        "DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck 
IN  (0, 1)")
@@ -432,16 +427,18 @@ public class ShortReadProtectionTest extends TestBaseImpl
         private final boolean flush, paging;
         private final String table;
         private final String qualifiedTableName;
+        private final TransactionalMode transactionalMode;
 
         private boolean flushed = false;
 
-        private Tester(ConsistencyLevel readConsistencyLevel, boolean flush, 
boolean paging)
+        private Tester(ConsistencyLevel readConsistencyLevel, boolean flush, 
boolean paging, TransactionalMode transactionalMode)
         {
             this.readConsistencyLevel = readConsistencyLevel;
             this.flush = flush;
             this.paging = paging;
             this.table = "t_" + seqNumber.getAndIncrement();
             qualifiedTableName = KEYSPACE + '.' + table;
+            this.transactionalMode = transactionalMode;
 
             assert readConsistencyLevel == ALL || readConsistencyLevel == 
QUORUM || readConsistencyLevel == SERIAL
             : "Only ALL and QUORUM consistency levels are supported";
@@ -449,7 +446,13 @@ public class ShortReadProtectionTest extends TestBaseImpl
 
         private Tester createTable(String query)
         {
-            cluster.schemaChange(format(query) + " AND read_repair='NONE'");
+            cluster.schemaChange(format(query) + " WITH read_repair='NONE'");
+            if (transactionalMode != TransactionalMode.off)
+            {
+                // For test purposes we create the table and require migration 
otherwise Accord
+                // won't bother to do interop reads with short read protection
+                cluster.schemaChange(format("ALTER TABLE %s WITH 
transactional_mode='" + transactionalMode + "\' AND 
transactional_migration_from = \'off\'"));
+            }
             return this;
         }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index 824498b229..c58b5d62b8 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -391,7 +391,7 @@ public abstract class AccordCQLTestBase extends 
AccordTestBase
     {
         accordRead = wrapInTxn(accordRead);
         Object[][] simpleReadResult;
-        if (transactionalMode.ignoresSuppliedConsistencyLevel)
+        if (transactionalMode.ignoresSuppleidCommitCL)
             // With accord non-SERIAL write strategy the commit CL is 
effectively ANY so we need to read at SERIAL
             simpleReadResult = cluster.coordinator(1).execute(simpleRead, 
ConsistencyLevel.SERIAL, key);
         else
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
index 1b8bfe8985..fcd9b1dabe 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
@@ -19,15 +19,21 @@
 package org.apache.cassandra.distributed.test.accord;
 
 import java.io.IOException;
+import java.util.function.Function;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.accord.IAccordService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class AccordInteroperabilityTest extends AccordTestBase
 {
@@ -52,7 +58,7 @@ public class AccordInteroperabilityTest extends AccordTestBase
              cluster -> {
                  ICoordinator coordinator = cluster.coordinator(1);
                  for (int i = 1; i <= 10; i++)
-                     coordinator.execute("INSERT INTO " + 
qualifiedAccordTableName + " (k, c, v) VALUES (0, ?, ?) USING TIMESTAMP 0;", 
ConsistencyLevel.ALL, i, i * 10);
+                     coordinator.execute("INSERT INTO " + 
qualifiedAccordTableName + " (k, c, v) VALUES (0, ?, ?) USING TIMESTAMP 0;", 
org.apache.cassandra.distributed.api.ConsistencyLevel.ALL, i, i * 10);
                  assertRowSerial(cluster, "SELECT c, v FROM " + 
qualifiedAccordTableName + " WHERE k=0 ORDER BY c DESC LIMIT 1", 
AssertUtils.row(10, 100));
                  assertRowSerial(cluster, "SELECT c, v FROM " + 
qualifiedAccordTableName + " WHERE k=0 ORDER BY c DESC LIMIT 2", 
AssertUtils.row(10, 100), AssertUtils.row(9, 90));
                  assertRowSerial(cluster, "SELECT c, v FROM " + 
qualifiedAccordTableName + " WHERE k=0 ORDER BY c DESC LIMIT 3", 
AssertUtils.row(10, 100), AssertUtils.row(9, 90), AssertUtils.row(8, 80));
@@ -60,4 +66,70 @@ public class AccordInteroperabilityTest extends 
AccordTestBase
              }
          );
     }
+
+    private static Object[][] assertTargetAccordRead(Function<Integer, 
Object[][]> query, int coordinatorIndex, int key, int expectedAccordReadCount)
+    {
+        int startingReadCount = getAccordReadCount(coordinatorIndex);
+        Object[][] result = query.apply(key);
+        assertEquals("Accord reads", expectedAccordReadCount, 
getAccordReadCount(coordinatorIndex) - startingReadCount);
+        return result;
+    }
+
+    private static Object[][] assertTargetAccordWrite(Function<Integer, 
Object[][]> query, int coordinatorIndex, int key, int expectedAccordWriteCount)
+    {
+        int startingWriteCount = getAccordWriteCount(coordinatorIndex);
+        Object[][] result = query.apply(key);
+        assertEquals("Accord writes", expectedAccordWriteCount, 
getAccordWriteCount(coordinatorIndex) - startingWriteCount);
+        return result;
+    }
+
+    @Test
+    public void testNonSerialReadIsThroughAccordFull() throws Throwable
+    {
+        test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v 
int, PRIMARY KEY(k, c)) WITH transactional_mode='full'",
+             cluster -> {
+                 for (ConsistencyLevel cl : ConsistencyLevel.values())
+                 {
+                     try
+                     {
+                         if (cl == ConsistencyLevel.ANY || cl == 
ConsistencyLevel.NODE_LOCAL)
+                             continue;
+                         assertTargetAccordRead(key -> 
cluster.coordinator(1).execute("SELECT * FROM " + qualifiedAccordTableName + " 
WHERE k = ?", 
org.apache.cassandra.distributed.api.ConsistencyLevel.valueOf(cl.name()), key), 
1, 1, 1);
+                         if 
(!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cl))
+                             fail("Unsupported consistency level succeeded");
+
+                     }
+                     catch (Throwable t)
+                     {
+                         assertEquals(InvalidRequestException.class.getName(), 
t.getClass().getName());
+                         assertEquals(cl + " is not supported by Accord", 
t.getMessage());
+                     }
+                 }
+             });
+    }
+
+    @Test
+    public void testNonSerialWriteIsThroughAccordFull() throws Throwable
+    {
+        test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v 
int, PRIMARY KEY(k, c)) WITH transactional_mode='full'",
+             cluster -> {
+                 for (ConsistencyLevel cl : ConsistencyLevel.values())
+                 {
+                     try
+                     {
+                         assertTargetAccordWrite(key -> 
cluster.coordinator(1).execute("INSERT INTO " + qualifiedAccordTableName + " 
(k, c, v) VALUES (?, 43, 44)", 
org.apache.cassandra.distributed.api.ConsistencyLevel.valueOf(cl.name()), key), 
1, 1, 1);
+                         if 
(!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(cl))
+                             fail("Unsupported consistency level succeeded");
+                     }
+                     catch (Throwable t)
+                     {
+                         assertEquals(InvalidRequestException.class.getName(), 
t.getClass().getName());
+                         if (cl == ConsistencyLevel.SERIAL || cl == 
ConsistencyLevel.LOCAL_SERIAL)
+                             assertEquals("You must use conditional updates 
for serializable writes", t.getMessage());
+                         else
+                            assertEquals(cl + " is not supported by Accord", 
t.getMessage());
+                     }
+                 }
+             });
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index db7476e799..6f2c451df3 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -186,6 +186,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.db.guardrails.Values$Config",
     "org.apache.cassandra.db.rows.UnfilteredSource",
     "org.apache.cassandra.dht.IPartitioner",
+    "org.apache.cassandra.dht.RingPosition",
     "org.apache.cassandra.distributed.api.IInstance",
     "org.apache.cassandra.distributed.api.IInvokableInstance",
     "org.apache.cassandra.distributed.api.IIsolatedExecutor",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to