This is an automated email from the ASF dual-hosted git repository.
aweisberg pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 62c88b6fab Non-serial single partition reads on Accord
62c88b6fab is described below
commit 62c88b6fabcaebd4d206e6cbb58df7fd16a60e4a
Author: Ariel Weisberg <[email protected]>
AuthorDate: Tue Sep 24 16:03:34 2024 -0400
Non-serial single partition reads on Accord
Patch by Ariel Weisberg; Reviewed by Benedict Elliott Smith for
CASSANDRA-19951
---
.../cassandra/cql3/statements/CQL3CasRequest.java | 14 ++-
.../org/apache/cassandra/service/CASRequest.java | 3 +-
.../org/apache/cassandra/service/StorageProxy.java | 127 +++++++++++++++++----
.../service/accord/AccordConfigurationService.java | 1 -
.../cassandra/service/accord/IAccordService.java | 11 +-
.../cassandra/service/accord/txn/TxnData.java | 11 ++
.../cassandra/service/accord/txn/TxnDataName.java | 5 +
.../cassandra/service/accord/txn/TxnRead.java | 11 +-
.../service/consensus/TransactionalMode.java | 54 ++++++---
.../ConsensusMigrationMutationHelper.java | 3 +
.../migration/ConsensusMigrationState.java | 23 ++--
.../migration/TransactionalMigrationFromMode.java | 5 +
.../cassandra/tcm/transformations/AlterSchema.java | 19 ++-
.../distributed/test/ShortReadProtectionTest.java | 49 ++++----
.../distributed/test/accord/AccordCQLTestBase.java | 2 +-
.../test/accord/AccordInteroperabilityTest.java | 76 +++++++++++-
.../config/DatabaseDescriptorRefTest.java | 1 +
17 files changed, 318 insertions(+), 97 deletions(-)
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 0cc5e73569..03438d2da8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -17,9 +17,6 @@
*/
package org.apache.cassandra.cql3.statements;
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.index.IndexRegistry;
-import org.apache.cassandra.schema.TableMetadata;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -52,11 +49,15 @@ import
org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
import org.apache.cassandra.service.CASRequest;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.accord.txn.TxnCondition;
@@ -69,6 +70,7 @@ import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.accord.txn.TxnUpdate;
import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.TimeUUID;
import static com.google.common.base.Preconditions.checkState;
@@ -77,6 +79,7 @@ import static
org.apache.cassandra.service.StorageProxy.ConsensusAttemptResult.R
import static
org.apache.cassandra.service.StorageProxy.ConsensusAttemptResult.casResult;
import static
org.apache.cassandra.service.accord.txn.TxnDataName.Kind.CAS_READ;
import static
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_protocol;
+import static
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata;
/**
@@ -470,13 +473,14 @@ public class CQL3CasRequest implements CASRequest
}
@Override
- public Txn toAccordTxn(ConsistencyLevel consistencyLevel, ConsistencyLevel
commitConsistencyLevel, ClientState clientState, long nowInSecs)
+ public Txn toAccordTxn(ClusterMetadata cm, ConsistencyLevel
consistencyLevel, ConsistencyLevel commitConsistencyLevel, ClientState
clientState, long nowInSecs)
{
SinglePartitionReadCommand readCommand = readCommand(nowInSecs);
Update update = createUpdate(clientState, commitConsistencyLevel);
// If the write strategy is sending all writes through Accord there is
no need to use the supplied consistency
// level since Accord will manage reading safely
- consistencyLevel =
metadata.params.transactionalMode.readCLForStrategy(consistencyLevel);
+ TableParams tableParams = getTableMetadata(cm, metadata.id).params;
+ consistencyLevel =
tableParams.transactionalMode.readCLForStrategy(tableParams.transactionalMigrationFrom,
consistencyLevel, cm, metadata.id, readCommand.partitionKey().getToken());
TxnRead read = TxnRead.createCasRead(readCommand, consistencyLevel);
// In a CAS requesting only one key is supported and writes
// can't be dependent on any data that is read (only conditions)
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java
b/src/java/org/apache/cassandra/service/CASRequest.java
index c50f078a5a..b4ddbc1760 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.tcm.ClusterMetadata;
import static org.apache.cassandra.service.StorageProxy.ConsensusAttemptResult;
@@ -50,7 +51,7 @@ public interface CASRequest
*/
PartitionUpdate makeUpdates(FilteredPartition current, ClientState
clientState, Ballot ballot) throws InvalidRequestException;
- Txn toAccordTxn(ConsistencyLevel consistencyLevel, ConsistencyLevel
commitConsistencyLevel, ClientState clientState, long nowInSecs);
+ Txn toAccordTxn(ClusterMetadata cm, ConsistencyLevel consistencyLevel,
ConsistencyLevel commitConsistencyLevel, ClientState clientState, long
nowInSecs);
ConsensusAttemptResult toCasResult(TxnResult txnResult);
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 7274415c4b..4dac4a6d9e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -49,7 +50,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.primitives.Txn;
-import accord.utils.Invariants;
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
@@ -128,10 +128,12 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.IAccordService.AsyncTxnResult;
import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
import org.apache.cassandra.service.accord.txn.TxnQuery;
import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.service.accord.txn.TxnResult;
@@ -139,6 +141,7 @@ import
org.apache.cassandra.service.consensus.TransactionalMode;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutations;
import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
+import
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.ContentionStrategy;
@@ -172,6 +175,7 @@ import
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static accord.primitives.Txn.Kind.EphemeralRead;
import static accord.primitives.Txn.Kind.Read;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.concat;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -200,6 +204,7 @@ import static
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_p
import static
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.mutateWithAccordAsync;
import static
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.splitMutationsIntoAccordAndNormal;
import static
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.ConsensusRoutingDecision;
+import static
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata;
import static org.apache.cassandra.service.paxos.Ballot.Flag.GLOBAL;
import static org.apache.cassandra.service.paxos.Ballot.Flag.LOCAL;
import static
org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallot;
@@ -345,7 +350,6 @@ public class StorageProxy implements StorageProxyMBean
Dispatcher.RequestTime requestTime)
throws UnavailableException, IsBootstrappingException,
RequestFailureException, RequestTimeoutException, InvalidRequestException,
CasWriteUnknownResultException
{
- TableMetadata metadata = Schema.instance.validateTable(keyspaceName,
cfName);
if (DatabaseDescriptor.getPartitionDenylistEnabled() &&
DatabaseDescriptor.getDenylistWritesEnabled() &&
!partitionDenylist.isKeyPermitted(keyspaceName, cfName, key.getKey()))
{
denylistMetrics.incrementWritesRejected();
@@ -356,6 +360,8 @@ public class StorageProxy implements StorageProxyMBean
ConsensusAttemptResult lastAttemptResult;
do
{
+ ClusterMetadata cm = ClusterMetadata.current();
+ TableMetadata metadata =
Schema.instance.validateTable(keyspaceName, cfName);
ConsensusRoutingDecision decision = consensusRouting(metadata,
key, consistencyForPaxos, requestTime, true);
switch (decision)
{
@@ -378,10 +384,11 @@ public class StorageProxy implements StorageProxyMBean
requestTime);
break;
case accord:
- Txn txn = request.toAccordTxn(consistencyForPaxos,
- consistencyForCommit,
- clientState,
- nowInSeconds);
+ Txn txn = request.toAccordTxn(cm,
+ consistencyForPaxos,
+ consistencyForCommit,
+ clientState,
+ nowInSeconds);
IAccordService accordService = AccordService.instance();
TxnResult txnResult = accordService.coordinate(txn,
consistencyForPaxos,
@@ -2121,6 +2128,7 @@ public class StorageProxy implements StorageProxyMBean
ConsensusAttemptResult lastResult;
do
{
+ ClusterMetadata cm = ClusterMetadata.current();
SinglePartitionReadCommand command = group.queries.get(0);
ConsensusRoutingDecision decision =
consensusRouting(group.metadata(), command.partitionKey(), consistencyLevel,
requestTime, false);
switch (decision)
@@ -2132,7 +2140,7 @@ public class StorageProxy implements StorageProxyMBean
lastResult = legacyReadWithPaxos(group, consistencyLevel,
requestTime);
break;
case accord:
- lastResult = readWithAccord(group, consistencyLevel,
requestTime);
+ lastResult = readWithAccord(cm, group, consistencyLevel,
requestTime);
break;
default:
throw new IllegalStateException("Unsupported consensus " +
decision);
@@ -2141,28 +2149,79 @@ public class StorageProxy implements StorageProxyMBean
return lastResult.serialReadResult;
}
- private static ConsensusAttemptResult
readWithAccord(SinglePartitionReadCommand.Group group, ConsistencyLevel
consistencyLevel, Dispatcher.RequestTime requestTime)
+ private static ConsistencyLevel
consistencyLevelForAccordRead(ClusterMetadata cm,
SinglePartitionReadCommand.Group group, @Nullable ConsistencyLevel
consistencyLevel)
{
- if (group.queries.size() > 1)
- throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency
may only be requested for one partition at a time");
- SinglePartitionReadCommand readCommand = group.queries.get(0);
+ // Null means no specific consistency behavior is required from
Accord, it's functionally similar to
+ // reading at ONE if you are reading data that wasn't written via
Accord
+ if (consistencyLevel == null)
+ return null;
+
+ TableId tableId = group.queries.get(0).metadata().id;
+ TableParams tableParams = getTableMetadata(cm, tableId).params;
+ TransactionalMode mode = tableParams.transactionalMode;
+ TransactionalMigrationFromMode migrationFromMode =
tableParams.transactionalMigrationFrom;
+ for (SinglePartitionReadCommand command : group.queries)
+ {
+ // readCLForStrategy should return either null or the supplied
consistency level
+ // in which case we will read everything at that CL since Accord
doesn't support per table
+ // read consistency
+ ConsistencyLevel commitCL =
mode.readCLForStrategy(migrationFromMode, consistencyLevel, cm, tableId,
command.partitionKey().getToken());
+ if (commitCL != null)
+ return commitCL;
+ }
+ return null;
+ }
+
+ private static ConsensusAttemptResult readWithAccord(ClusterMetadata cm,
SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
+ {
+ if (consistencyLevel != null &&
!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
+ throw new InvalidRequestException(consistencyLevel + " is not
supported by Accord");
// If the non-SERIAL write strategy is sending all writes through
Accord there is no need to use the supplied consistency
// level since Accord will manage reading safely
TransactionalMode transactionalMode =
group.metadata().params.transactionalMode;
- consistencyLevel =
transactionalMode.readCLForStrategy(consistencyLevel);
- TxnRead read = TxnRead.createSerialRead(readCommand, consistencyLevel);
- Invariants.checkState(read.keys().size() == 1, "Ephemeral reads are
only strict-serializable for single partition reads");
- Txn txn = new Txn.InMemory(transactionalMode == TransactionalMode.full
&& DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() ? EphemeralRead :
Read, read.keys(), read, TxnQuery.ALL, null);
- IAccordService accordService = AccordService.instance();
- TxnResult txnResult = accordService.coordinate(txn, consistencyLevel,
requestTime);
+ consistencyLevel = consistencyLevelForAccordRead(cm, group,
consistencyLevel);
+ TxnRead read = TxnRead.createSerialRead(group.queries,
consistencyLevel);
+ Txn.Kind kind = Read;
+ if (transactionalMode == TransactionalMode.full &&
DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() &&
group.queries.size() == 1)
+ kind = EphemeralRead;
+ Txn txn = new Txn.InMemory(kind, read.keys(), read, TxnQuery.ALL,
null);
+ AsyncTxnResult asyncTxnResult =
AccordService.instance().coordinateAsync(txn, consistencyLevel, requestTime);
+ return getConsensusAttemptResultFromAsyncTxnResult(asyncTxnResult,
group.queries.size(), index -> group.queries.get(index).isReversed(),
consistencyLevel, requestTime);
+ }
+
+ /*
+ * Used for both the SERIAL and non-SERIAL read path into Accord
+ */
+ public static ConsensusAttemptResult
getConsensusAttemptResultFromAsyncTxnResult(AsyncTxnResult asyncTxnResult, int
numQueries, IntPredicate isQueryReversed, ConsistencyLevel cl,
Dispatcher.RequestTime requestTime)
+ {
+ TxnResult txnResult =
AccordService.instance().getTxnResult(asyncTxnResult, false, cl, requestTime);
+ // TODO (required): Converge on a single approach to
RETRY_NEW_PROTOCOL, this works for now because reads don't support it anyways
if (txnResult.kind() == retry_new_protocol)
return RETRY_NEW_PROTOCOL;
- TxnData data = (TxnData)txnResult;
- FilteredPartition partition = data.get(TxnRead.SERIAL_READ);
- if (partition != null)
- return
serialReadResult(PartitionIterators.singletonIterator(partition.rowIterator(readCommand.isReversed())));
- else
+ TxnData data = (TxnData) txnResult;
+
+ if (data.isEmpty())
+ {
return serialReadResult(EmptyIterators.partition());
+ }
+ else if (data.size() == 1)
+ {
+ FilteredPartition value = data.values().iterator().next();
+ return
serialReadResult(PartitionIterators.singletonIterator(value.rowIterator(isQueryReversed.test(0))));
+ }
+ else
+ {
+ // TODO (review): 95% sure this isn't actually needed and the
consumer is going consume these by DecoratedKey not iteration order, but the
non-transactional path does preserve the order of the iterators
+ List<PartitionIterator> partitionIterators = new
ArrayList<>(numQueries);
+ for (int i = 0; i < numQueries; i++)
+ partitionIterators.add(null);
+ for (Map.Entry<TxnDataName, FilteredPartition> e : data.entrySet())
+ {
+ int queryIndex = Integer.valueOf(e.getKey().part(0));
+ partitionIterators.set(queryIndex,
PartitionIterators.singletonIterator(e.getValue().rowIterator(isQueryReversed.test(queryIndex))));
+ }
+ return serialReadResult(partitionIterators.size() == 1 ?
partitionIterators.get(0) : PartitionIterators.concat(partitionIterators));
+ }
}
private static ConsensusAttemptResult
legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel
consistencyLevel, Dispatcher.RequestTime requestTime)
@@ -2267,7 +2326,29 @@ public class StorageProxy implements StorageProxyMBean
long start = nanoTime();
try
{
- PartitionIterator result = fetchRows(group.queries,
consistencyLevel, coordinator, requestTime);
+ ClusterMetadata cm = ClusterMetadata.current();
+ TableId tableId = group.queries.get(0).metadata().id;
+ // Returns null for local tables
+ TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+ if (tableMetadata == null)
+ tableMetadata =
Schema.instance.localKeyspaces().getTableOrViewNullable(tableId);
+ TableParams tableParams = tableMetadata.params;
+
+ TransactionalMode transactionalMode =
tableParams.transactionalMode;
+// TransactionalMigrationFromMode transactionalMigrationFromMode =
tableParams.transactionalMigrationFrom;
+ // TODO (required): Tests would fail with this and we need to add
live migration support anyways so for now allow it
+// if (transactionalMigrationFromMode !=
TransactionalMigrationFromMode.none)
+// throw new UnsupportedOperationException("Live migration is
not supported, can't safely read when migrating from " +
transactionalMigrationFromMode + " to " + transactionalMode);
+
+ PartitionIterator result;
+ if (transactionalMode.readsThroughAccord &&
coordinator.isEventuallyConsistent())
+ {
+ ConsensusAttemptResult consensusAttemptResult =
readWithAccord(cm, group, consistencyLevel, requestTime);
+
checkState(!consensusAttemptResult.shouldRetryOnNewConsensusProtocol, "Live
migration is not supported with non-SERIAL reads yet");
+ result = consensusAttemptResult.serialReadResult;
+ }
+ else
+ result = fetchRows(group.queries, consistencyLevel,
coordinator, requestTime);
// Note that the only difference between the command in a group
must be the partition key on which
// they applied.
boolean enforceStrictLiveness =
group.queries.get(0).metadata().enforceStrictLiveness();
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 4f6f3d1666..08461249c3 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index bba67a9160..97ebae6963 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -18,16 +18,15 @@
package org.apache.cassandra.service.accord;
+import java.util.Collection;
+import java.util.EnumSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import com.google.common.collect.ImmutableSet;
-
import accord.api.BarrierType;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.DurableBefore;
@@ -59,15 +58,13 @@ import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
-import java.util.Collection;
-
import static com.google.common.base.Preconditions.checkNotNull;
public interface IAccordService
{
- Set<ConsistencyLevel> SUPPORTED_COMMIT_CONSISTENCY_LEVELS =
ImmutableSet.of(ConsistencyLevel.ANY, ConsistencyLevel.ONE,
ConsistencyLevel.LOCAL_ONE, ConsistencyLevel.QUORUM, ConsistencyLevel.SERIAL,
ConsistencyLevel.ALL);
- Set<ConsistencyLevel> SUPPORTED_READ_CONSISTENCY_LEVELS =
ImmutableSet.of(ConsistencyLevel.ONE, ConsistencyLevel.QUORUM,
ConsistencyLevel.SERIAL);
+ EnumSet<ConsistencyLevel> SUPPORTED_COMMIT_CONSISTENCY_LEVELS =
EnumSet.of(ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.QUORUM,
ConsistencyLevel.SERIAL, ConsistencyLevel.ALL);
+ EnumSet<ConsistencyLevel> SUPPORTED_READ_CONSISTENCY_LEVELS =
EnumSet.of(ConsistencyLevel.ONE, ConsistencyLevel.QUORUM,
ConsistencyLevel.SERIAL);
IVerbHandler<? extends Request> requestHandler();
IVerbHandler<? extends Reply> responseHandler();
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index 9c2ae88f83..6ca4fc5083 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -80,6 +81,16 @@ public class TxnData extends TxnResult implements Data,
Iterable<FilteredPartiti
return data.entrySet();
}
+ public Collection<FilteredPartition> values()
+ {
+ return data.values();
+ }
+
+ public int size()
+ {
+ return data.size();
+ }
+
public boolean isEmpty()
{
return data.isEmpty();
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
index 4f3edbff0b..5625473857 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
@@ -144,6 +144,11 @@ public class TxnDataName implements Comparable<TxnDataName>
return Collections.unmodifiableList(Arrays.asList(parts));
}
+ public String part(int index)
+ {
+ return parts[index];
+ }
+
public DecoratedKey getDecoratedKey(TableMetadata metadata)
{
checkKind(Kind.AUTO_READ);
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 694c3f225a..391da765bf 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -36,10 +36,10 @@ import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -100,10 +100,13 @@ public class TxnRead extends
AbstractKeySorted<TxnNamedRead> implements Read
return new TxnRead(items, txnKeys, consistencyLevel);
}
- public static TxnRead createSerialRead(SinglePartitionReadCommand
readCommand, ConsistencyLevel consistencyLevel)
+ public static TxnRead createSerialRead(List<SinglePartitionReadCommand>
readCommands, ConsistencyLevel consistencyLevel)
{
- TxnNamedRead read = new TxnNamedRead(SERIAL_READ, readCommand);
- return new TxnRead(ImmutableList.of(read), Keys.of(read.key()),
consistencyLevel);
+ List<TxnNamedRead> reads = new ArrayList<>(readCommands.size());
+ for (int i = 0; i < readCommands.size(); i++)
+ reads.add(new TxnNamedRead(TxnDataName.user(String.valueOf(i)),
readCommands.get(i)));
+ Keys keys = Keys.of(reads, TxnNamedRead::key);
+ return new TxnRead(reads, keys, consistencyLevel);
}
public static TxnRead createCasRead(SinglePartitionReadCommand
readCommand, ConsistencyLevel consistencyLevel)
diff --git
a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
index 24a5ba5821..67f9e30b91 100644
--- a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
+++ b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
@@ -19,7 +19,15 @@
package org.apache.cassandra.service.consensus;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.accord.IAccordService;
+import org.apache.cassandra.service.consensus.migration.TableMigrationState;
+import
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+import static com.google.common.base.Preconditions.checkState;
/*
* Configure the transactional behavior of a table. Enables accord on a table
and defines how it mixes with non-serial writes
@@ -57,53 +65,54 @@ import org.apache.cassandra.service.accord.IAccordService;
public enum TransactionalMode
{
// Running on Paxos V1 or V2 with Accord disabled
- off(false, false, false, false),
+ off(false, false, false, false, false),
/*
* Execute writes through Cassandra via StorageProxy's normal write path.
This can lead Accord to compute
* multiple outcomes for a transaction that depends on data written by
non-SERIAL writes.
*/
- unsafe(true, false, false, false),
+ unsafe(true, false, false, false, false),
/*
* Allow mixing of non-SERIAL writes and Accord, but still force BRR
through Accord.
* This mode makes it safe to perform non-SERIAL or SERIAL reads of Accord
data, but unsafe
* to write data that Accord may attempt to read.
*/
- unsafe_writes(true, false, false, true),
+ unsafe_writes(true, false, false, false, true),
/*
* Execute writes through Accord skipping StorageProxy's normal write
path, but commit
* writes at the provided consistency level so they can be read via
non-SERIAL consistency levels.
* This mode makes it safe to read/write data that Accord will read/write.
*/
- mixed_reads(true, false, true, true),
+ mixed_reads(true, false, true, false, true),
/*
* Execute writes through Accord skipping StorageProxy's normal write
path. Ignores the provided consistency level
* which makes Accord commit writes at ANY similar to Paxos with commit
consistency level ANY.
*/
- full(true, true, true, true);
+ full(true, true, true, true, true);
public final boolean accordIsEnabled;
- public final boolean ignoresSuppliedConsistencyLevel;
+ public final boolean ignoresSuppleidCommitCL;
public final boolean writesThroughAccord;
-
+ public final boolean readsThroughAccord;
public final boolean blockingReadRepairThroughAccord;
private final String cqlParam;
- TransactionalMode(boolean accordIsEnabled, boolean
ignoresSuppliedConsistencyLevel, boolean writesThroughAccord, boolean
blockingReadRepairThroughAccord)
+ TransactionalMode(boolean accordIsEnabled, boolean
ignoresSuppleidCommitCL, boolean writesThroughAccord, boolean
readsThroughAccord, boolean blockingReadRepairThroughAccord)
{
this.accordIsEnabled = accordIsEnabled;
- this.ignoresSuppliedConsistencyLevel = ignoresSuppliedConsistencyLevel;
+ this.ignoresSuppleidCommitCL = ignoresSuppleidCommitCL;
this.writesThroughAccord = writesThroughAccord;
+ this.readsThroughAccord = readsThroughAccord;
this.blockingReadRepairThroughAccord = blockingReadRepairThroughAccord;
this.cqlParam = String.format("transactional_mode = '%s'",
this.name().toLowerCase());
}
public ConsistencyLevel commitCLForStrategy(ConsistencyLevel
consistencyLevel)
{
- if (ignoresSuppliedConsistencyLevel)
+ if (ignoresSuppleidCommitCL)
return null;
if
(!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
@@ -112,14 +121,25 @@ public enum TransactionalMode
return consistencyLevel;
}
- // TODO (required): This won't work for migration directly from none to
full because there is no safe system to read from
- // during the first phase (repair). Accord won't read correctly beacuse it
won't honor the CL and miss non-transactional writes that haven't been repaired
and non-transactional
- // reads will miss all the writes being routed through Accord since they
occur asynchronously. Something has to give here where either writes routed
through are Accord are synchronous at CL
- // or reads are routed through Accord and read at quorum as long as the
range has not completed the first phase (repair).
- public ConsistencyLevel readCLForStrategy(ConsistencyLevel
consistencyLevel)
+ private boolean ignoresSuppliedReadCL()
{
- if (ignoresSuppliedConsistencyLevel)
- return null;
+ return writesThroughAccord && blockingReadRepairThroughAccord;
+ }
+
+ public ConsistencyLevel readCLForStrategy(TransactionalMigrationFromMode
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId
tableId, Token token)
+ {
+ if (ignoresSuppliedReadCL())
+ {
+ TableMigrationState tms =
cm.consensusMigrationState.tableStates.get(tableId);
+ checkState(tms != null || fromMode ==
TransactionalMigrationFromMode.none);
+
+ // Only ignore the supplied consistency level if the token is not
migrating
+ // otherwise honor it because we might read through Accord for
non-SERIAL reads before repair is run
+ // this is OK to do because BRR still works and Accord isn't
computing a write so recovery
+ // determinism isn't an issue
+ if (tms == null || Range.isInNormalizedRanges(token,
tms.migratedRanges))
+ return null;
+ }
if
(!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
throw new UnsupportedOperationException("Consistency level " +
consistencyLevel + " is unsupported with Accord for read, supported are ONE,
QUORUM, and SERIAL");
diff --git
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
index 32997bfb0c..b3d7422090 100644
---
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
+++
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
@@ -235,6 +236,8 @@ public class ConsensusMigrationMutationHelper
public static AsyncTxnResult mutateWithAccordAsync(ClusterMetadata cm,
Collection<? extends IMutation> mutations, @Nullable ConsistencyLevel
consistencyLevel, Dispatcher.RequestTime requestTime)
{
+ if (consistencyLevel != null &&
!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
+ throw new InvalidRequestException(consistencyLevel + " is not
supported by Accord");
int fragmentIndex = 0;
List<TxnWrite.Fragment> fragments = new ArrayList<>(mutations.size());
List<PartitionKey> partitionKeys = new ArrayList<>(mutations.size());
diff --git
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
index 731aabb735..b2d1195b1a 100644
---
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
+++
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.utils.PojoToString;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap;
import static org.apache.cassandra.utils.CollectionSerializers.newHashMap;
import static org.apache.cassandra.utils.CollectionSerializers.serializeMap;
@@ -136,16 +137,20 @@ public class ConsensusMigrationState implements
MetadataValue<ConsensusMigration
private static void withRangesMigrating(Map<TableId, TableMigrationState>
current, ImmutableMap.Builder<TableId, TableMigrationState> next, TableMetadata
metadata, List<Range<Token>> ranges, boolean overwrite)
{
- TableMigrationState tableState;
+ TableMigrationState tableState = current.get(metadata.id);
+ checkState(tableState != null || overwrite, "Can't begin migrating a
table without first altering the schema to set transactional mode");
+ TransactionalMigrationFromMode migrationFromMode =
metadata.params.transactionalMigrationFrom;
ConsensusMigrationTarget target =
ConsensusMigrationTarget.fromTransactionalMode(metadata.params.transactionalMode);
- if (!overwrite && current.containsKey(metadata.id))
- {
- tableState = current.get(metadata.id).withRangesMigrating(ranges,
target);
- }
+ checkState(migrationFromMode != null && migrationFromMode !=
TransactionalMigrationFromMode.none, "Table transactional migration from can't
be null or none");
+
+ Map<Epoch, List<Range<Token>>> migratingRangesByEpoch =
ImmutableMap.of();
+ if (!ranges.isEmpty())
+ ImmutableMap.of(Epoch.EMPTY, ranges);
+
+ if (overwrite)
+ tableState = new TableMigrationState(metadata.keyspace,
metadata.name, metadata.id, target, ImmutableSet.of(), migratingRangesByEpoch);
else
- {
- tableState = new TableMigrationState(metadata.keyspace,
metadata.name, metadata.id, target, ImmutableSet.of(),
ImmutableMap.of(Epoch.EMPTY, ranges));
- }
+ tableState = tableState.withRangesMigrating(ranges, target);
next.put(metadata.id, tableState);
}
@@ -238,7 +243,7 @@ public class ConsensusMigrationState implements
MetadataValue<ConsensusMigration
{
tableStates.forEach((id, migrationState) -> {
TableMetadata metadata = schema.getTableMetadata(id);
-
Preconditions.checkState(ConsensusMigrationTarget.fromTransactionalMode(metadata.params.transactionalMode).equals(migrationState.targetProtocol));
+
checkState(ConsensusMigrationTarget.fromTransactionalMode(metadata.params.transactionalMode).equals(migrationState.targetProtocol));
});
}
diff --git
a/src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java
b/src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java
index f916a96b8e..5f88a78761 100644
---
a/src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java
+++
b/src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java
@@ -76,6 +76,11 @@ public enum TransactionalMigrationFromMode
return from != null && from.writesThroughAccord;
}
+ public boolean readsThroughAccord()
+ {
+ return from != null && from.writesThroughAccord;
+ }
+
public boolean isMigrating()
{
return this != none;
diff --git a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
index c97c6e3be2..8c13631122 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
@@ -19,17 +19,23 @@
package org.apache.cassandra.tcm.transformations;
import java.io.IOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
-import org.apache.cassandra.config.AccordSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.AccordSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -63,6 +69,7 @@ import
org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static org.apache.cassandra.exceptions.ExceptionCode.ALREADY_EXISTS;
import static org.apache.cassandra.exceptions.ExceptionCode.CONFIG_ERROR;
@@ -285,6 +292,9 @@ public class AlterSchema implements Transformation
.map(alt -> alt.after)
.collect(Collectors.toUnmodifiableSet());
+ Set<TableId> startedAndReversed =
Sets.intersection(started.stream().map(TableMetadata::id).collect(Collectors.toSet()),
reversals.keySet());
+ checkState(startedAndReversed.isEmpty(), "Set of tables starting
migration and reversing migration should not intersect");
+
if (!started.isEmpty())
{
List<Range<Token>> ranges;
@@ -301,8 +311,9 @@ public class AlterSchema implements Transformation
break;
}
- if (!ranges.isEmpty())
- migrationState = migrationState.withRangesMigrating(started,
ranges, true);
+ // Always create the migration state even if nothing is currently
migrating, the empty state
+ // signals that a migration is in progress with no migrating
ranges and corresponds to transactionalMigrationFrom != none
+ migrationState = migrationState.withRangesMigrating(started,
ranges, true);
}
migrationState = migrationState.withReversedMigrations(reversals,
next.epoch());
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
index 4d2ae2df7a..c9ab6b30cc 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
@@ -120,12 +120,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Before
public void setupTester()
{
- tester = new Tester(readConsistencyLevel, flush, paging);
- }
-
- private String transactionalModeCQL()
- {
- return " WITH transactional_mode='" + transactionalMode + '\'';
+ tester = new Tester(readConsistencyLevel, flush, paging,
transactionalMode);
}
@After
@@ -144,7 +139,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testSkinnyTableWithoutLiveRows()
{
- tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)" +
transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
.allNodes("INSERT INTO %s (id) VALUES (0) USING TIMESTAMP 0")
.toNode1("DELETE FROM %s WHERE id = 0")
.assertRows("SELECT DISTINCT id FROM %s WHERE id = 0")
@@ -161,7 +156,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testSkinnyTableWithLiveRows()
{
- tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)" +
transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
.allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d)
USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
.toNode1("DELETE FROM %s WHERE id IN (1, 0, 4, 6, 3)") // delete
every other row
.assertRows("SELECT DISTINCT token(id), id FROM %s",
@@ -178,7 +173,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testSkinnyTableWithComplementaryDeletions()
{
- tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)" +
transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
.allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d)
USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
.toNode1("DELETE FROM %s WHERE id IN (5, 8, 2, 7, 9)") // delete
every other row
.toNode2("DELETE FROM %s WHERE id IN (1, 0, 4, 6)") // delete
every other row but the last one
@@ -196,7 +191,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testMultipleMissedRows()
{
- tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))")
.allNodes(0, 4, i -> format("INSERT INTO %%s (pk, ck) VALUES (0,
%d) USING TIMESTAMP 0", i))
.toNode1("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2, 3)",
"INSERT INTO %s (pk, ck) VALUES (0, 5)")
@@ -215,7 +210,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testAscendingOrder()
{
- tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY
KEY(k, c))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY
KEY(k, c))")
.allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES
(0, %d, %d) USING TIMESTAMP 0", i, i * 10))
.toNode1("DELETE FROM %s WHERE k=0 AND c=1")
.toNode2("DELETE FROM %s WHERE k=0 AND c=2")
@@ -237,7 +232,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testDescendingOrder()
{
- tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY
KEY(k, c))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY
KEY(k, c))")
.allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES
(0, %d, %d) USING TIMESTAMP 0", i, i * 10))
.toNode1("DELETE FROM %s WHERE k=0 AND c=7")
.toNode2("DELETE FROM %s WHERE k=0 AND c=8")
@@ -260,7 +255,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testDeletePartition()
{
- tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY
KEY(k, c))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY
KEY(k, c))")
.allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING
TIMESTAMP 0",
"INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING
TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE k=0")
@@ -273,7 +268,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testDeletePartitionWithStatic()
{
- tester.createTable("CREATE TABLE %s (k int, c int, v int, s int
STATIC, PRIMARY KEY(k, c))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, s int
STATIC, PRIMARY KEY(k, c))")
.allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100)
USING TIMESTAMP 0",
"INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING
TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE k=0")
@@ -286,7 +281,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testDeleteClustering()
{
- tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY
KEY(k, c))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY
KEY(k, c))")
.allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING
TIMESTAMP 0",
"INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING
TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE k=0 AND c=1")
@@ -301,7 +296,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testDeleteClusteringWithStatic()
{
- tester.createTable("CREATE TABLE %s (k int, c int, v int, s int
STATIC, PRIMARY KEY(k, c))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, s int
STATIC, PRIMARY KEY(k, c))")
.allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100)
USING TIMESTAMP 0",
"INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING
TIMESTAMP 0")
.toNode2("DELETE FROM %s WHERE k=0 AND c=1")
@@ -318,7 +313,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testGroupByRegularRow()
{
- tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))")
.toNode1("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP
0",
"DELETE FROM %s WHERE pk=0 AND ck=0",
"INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP
0")
@@ -341,7 +336,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testGroupByStaticRow()
{
- tester.createTable("CREATE TABLE %s (pk int, ck int, s int static,
PRIMARY KEY (pk, ck))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (pk int, ck int, s int static,
PRIMARY KEY (pk, ck))")
.toNode1("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP
0",
"INSERT INTO %s (pk, s) VALUES (0, null)",
"INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP
0")
@@ -364,7 +359,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testSkipEarlyTermination()
{
- tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))")
.toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0)")
.toNode2("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2)")
.assertRows("SELECT DISTINCT pk FROM %s", row(0));
@@ -381,7 +376,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testSkipEarlyTerminationRows()
{
- tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))")
.toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP
0",
"INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP
0",
"INSERT INTO %s (pk, ck) VALUES (2, 0) USING TIMESTAMP
0",
@@ -405,7 +400,7 @@ public class ShortReadProtectionTest extends TestBaseImpl
@Test
public void testSkipEarlyTerminationPartitions()
{
- tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))" + transactionalModeCQL())
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))")
.toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP
0",
"INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP
0",
"DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck
IN (0, 1)")
@@ -432,16 +427,18 @@ public class ShortReadProtectionTest extends TestBaseImpl
private final boolean flush, paging;
private final String table;
private final String qualifiedTableName;
+ private final TransactionalMode transactionalMode;
private boolean flushed = false;
- private Tester(ConsistencyLevel readConsistencyLevel, boolean flush,
boolean paging)
+ private Tester(ConsistencyLevel readConsistencyLevel, boolean flush,
boolean paging, TransactionalMode transactionalMode)
{
this.readConsistencyLevel = readConsistencyLevel;
this.flush = flush;
this.paging = paging;
this.table = "t_" + seqNumber.getAndIncrement();
qualifiedTableName = KEYSPACE + '.' + table;
+ this.transactionalMode = transactionalMode;
assert readConsistencyLevel == ALL || readConsistencyLevel ==
QUORUM || readConsistencyLevel == SERIAL
: "Only ALL and QUORUM consistency levels are supported";
@@ -449,7 +446,13 @@ public class ShortReadProtectionTest extends TestBaseImpl
private Tester createTable(String query)
{
- cluster.schemaChange(format(query) + " AND read_repair='NONE'");
+ cluster.schemaChange(format(query) + " WITH read_repair='NONE'");
+ if (transactionalMode != TransactionalMode.off)
+ {
+ // For test purposes we create the table and require migration
otherwise Accord
+ // won't bother to do interop reads with short read protection
+ cluster.schemaChange(format("ALTER TABLE %s WITH
transactional_mode='" + transactionalMode + "\' AND
transactional_migration_from = \'off\'"));
+ }
return this;
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index 824498b229..c58b5d62b8 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -391,7 +391,7 @@ public abstract class AccordCQLTestBase extends
AccordTestBase
{
accordRead = wrapInTxn(accordRead);
Object[][] simpleReadResult;
- if (transactionalMode.ignoresSuppliedConsistencyLevel)
+ if (transactionalMode.ignoresSuppleidCommitCL)
// With accord non-SERIAL write strategy the commit CL is
effectively ANY so we need to read at SERIAL
simpleReadResult = cluster.coordinator(1).execute(simpleRead,
ConsistencyLevel.SERIAL, key);
else
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
index 1b8bfe8985..fcd9b1dabe 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
@@ -19,15 +19,21 @@
package org.apache.cassandra.distributed.test.accord;
import java.io.IOException;
+import java.util.function.Function;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.accord.IAccordService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
public class AccordInteroperabilityTest extends AccordTestBase
{
@@ -52,7 +58,7 @@ public class AccordInteroperabilityTest extends AccordTestBase
cluster -> {
ICoordinator coordinator = cluster.coordinator(1);
for (int i = 1; i <= 10; i++)
- coordinator.execute("INSERT INTO " +
qualifiedAccordTableName + " (k, c, v) VALUES (0, ?, ?) USING TIMESTAMP 0;",
ConsistencyLevel.ALL, i, i * 10);
+ coordinator.execute("INSERT INTO " +
qualifiedAccordTableName + " (k, c, v) VALUES (0, ?, ?) USING TIMESTAMP 0;",
org.apache.cassandra.distributed.api.ConsistencyLevel.ALL, i, i * 10);
assertRowSerial(cluster, "SELECT c, v FROM " +
qualifiedAccordTableName + " WHERE k=0 ORDER BY c DESC LIMIT 1",
AssertUtils.row(10, 100));
assertRowSerial(cluster, "SELECT c, v FROM " +
qualifiedAccordTableName + " WHERE k=0 ORDER BY c DESC LIMIT 2",
AssertUtils.row(10, 100), AssertUtils.row(9, 90));
assertRowSerial(cluster, "SELECT c, v FROM " +
qualifiedAccordTableName + " WHERE k=0 ORDER BY c DESC LIMIT 3",
AssertUtils.row(10, 100), AssertUtils.row(9, 90), AssertUtils.row(8, 80));
@@ -60,4 +66,70 @@ public class AccordInteroperabilityTest extends
AccordTestBase
}
);
}
+
+ private static Object[][] assertTargetAccordRead(Function<Integer,
Object[][]> query, int coordinatorIndex, int key, int expectedAccordReadCount)
+ {
+ int startingReadCount = getAccordReadCount(coordinatorIndex);
+ Object[][] result = query.apply(key);
+ assertEquals("Accord reads", expectedAccordReadCount,
getAccordReadCount(coordinatorIndex) - startingReadCount);
+ return result;
+ }
+
+ private static Object[][] assertTargetAccordWrite(Function<Integer,
Object[][]> query, int coordinatorIndex, int key, int expectedAccordWriteCount)
+ {
+ int startingWriteCount = getAccordWriteCount(coordinatorIndex);
+ Object[][] result = query.apply(key);
+ assertEquals("Accord writes", expectedAccordWriteCount,
getAccordWriteCount(coordinatorIndex) - startingWriteCount);
+ return result;
+ }
+
+ @Test
+ public void testNonSerialReadIsThroughAccordFull() throws Throwable
+ {
+ test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v
int, PRIMARY KEY(k, c)) WITH transactional_mode='full'",
+ cluster -> {
+ for (ConsistencyLevel cl : ConsistencyLevel.values())
+ {
+ try
+ {
+ if (cl == ConsistencyLevel.ANY || cl ==
ConsistencyLevel.NODE_LOCAL)
+ continue;
+ assertTargetAccordRead(key ->
cluster.coordinator(1).execute("SELECT * FROM " + qualifiedAccordTableName + "
WHERE k = ?",
org.apache.cassandra.distributed.api.ConsistencyLevel.valueOf(cl.name()), key),
1, 1, 1);
+ if
(!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cl))
+ fail("Unsupported consistency level succeeded");
+
+ }
+ catch (Throwable t)
+ {
+ assertEquals(InvalidRequestException.class.getName(),
t.getClass().getName());
+ assertEquals(cl + " is not supported by Accord",
t.getMessage());
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testNonSerialWriteIsThroughAccordFull() throws Throwable
+ {
+ test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v
int, PRIMARY KEY(k, c)) WITH transactional_mode='full'",
+ cluster -> {
+ for (ConsistencyLevel cl : ConsistencyLevel.values())
+ {
+ try
+ {
+ assertTargetAccordWrite(key ->
cluster.coordinator(1).execute("INSERT INTO " + qualifiedAccordTableName + "
(k, c, v) VALUES (?, 43, 44)",
org.apache.cassandra.distributed.api.ConsistencyLevel.valueOf(cl.name()), key),
1, 1, 1);
+ if
(!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(cl))
+ fail("Unsupported consistency level succeeded");
+ }
+ catch (Throwable t)
+ {
+ assertEquals(InvalidRequestException.class.getName(),
t.getClass().getName());
+ if (cl == ConsistencyLevel.SERIAL || cl ==
ConsistencyLevel.LOCAL_SERIAL)
+ assertEquals("You must use conditional updates
for serializable writes", t.getMessage());
+ else
+ assertEquals(cl + " is not supported by Accord",
t.getMessage());
+ }
+ }
+ });
+ }
}
diff --git
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index db7476e799..6f2c451df3 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -186,6 +186,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.db.guardrails.Values$Config",
"org.apache.cassandra.db.rows.UnfilteredSource",
"org.apache.cassandra.dht.IPartitioner",
+ "org.apache.cassandra.dht.RingPosition",
"org.apache.cassandra.distributed.api.IInstance",
"org.apache.cassandra.distributed.api.IInvokableInstance",
"org.apache.cassandra.distributed.api.IIsolatedExecutor",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]