This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9c8ab8809d Accord: write rejections would be returned to users as
server errors rather than INVALID and TxnReferenceOperation didn't handle all
collections prperly
9c8ab8809d is described below
commit 9c8ab8809d618f6d34316d131accef9ecd4b29a9
Author: David Capwell <[email protected]>
AuthorDate: Mon Dec 15 12:59:52 2025 -0800
Accord: write rejections would be returned to users as server errors rather
than INVALID and TxnReferenceOperation didn't handle all collections prperly
patch by David Capwell; reviewed by Benedict Elliott Smith, Caleb
Rackliffe, Jyothsna Konisa for CASSANDRA-21061
---
CHANGES.txt | 1 +
modules/accord | 2 +-
.../cassandra/cql3/statements/CQL3CasRequest.java | 2 +
.../cql3/statements/TransactionStatement.java | 5 +-
.../org/apache/cassandra/service/StorageProxy.java | 10 +-
.../cassandra/service/accord/txn/TxnDataValue.java | 10 +-
.../cassandra/service/accord/txn/TxnQuery.java | 6 +
.../service/accord/txn/TxnReferenceOperation.java | 108 +++++---
.../cassandra/service/accord/txn/TxnResult.java | 3 +-
.../cassandra/service/accord/txn/TxnUpdate.java | 38 ++-
...{TxnResult.java => TxnValidationRejection.java} | 39 ++-
.../distributed/test/accord/AccordCQLTestBase.java | 16 ++
.../cassandra/harry/model/BytesPartitionState.java | 2 +-
.../cassandra/harry/model/PartitionState.java | 6 +
.../cassandra/cql3/ast/CollectionAccess.java | 12 +
.../accord/txn/TxnReferenceOperationTest.java | 289 +++++++++++++++++++++
16 files changed, 490 insertions(+), 59 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9cac450533..ac47a545f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Accord: write rejections would be returned to users as server errors rather
than INVALID and TxnReferenceOperation didn't handle all collections prperly
(CASSANDRA-21061)
* Use byte[] directly in QueryOptions instead of ByteBuffer and convert them
to ArrayCell instead of BufferCell to reduce allocations (CASSANDRA-20166)
* Log queries scanning too many SSTables per read (CASSANDRA-21048)
* Extend nodetool verify to (optionally) validate SAI files (CASSANDRA-20949)
diff --git a/modules/accord b/modules/accord
index 6bae51f6a4..f6b0a6998f 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 6bae51f6a4bd560a82840fa0809cd1d630696cc0
+Subproject commit f6b0a6998faca767e6951976097dec704c306b0e
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 080a705ffa..9cc2b94c66 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -74,6 +74,7 @@ import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.service.accord.txn.TxnReference;
import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.tcm.ClusterMetadata;
@@ -574,6 +575,7 @@ public class CQL3CasRequest implements CASRequest
{
if (txnResult.kind() == retry_new_protocol)
return RETRY_NEW_PROTOCOL;
+ TxnValidationRejection.maybeThrow(txnResult);
TxnData txnData = (TxnData)txnResult;
TxnDataKeyValue partition =
(TxnDataKeyValue)txnData.get(txnDataName(CAS_READ));
return casResult(partition != null ? partition.rowIterator(false) :
null);
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index caff467cd7..97482d994d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -85,6 +85,7 @@ import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.service.accord.txn.TxnReference;
import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.apache.cassandra.service.consensus.TransactionalMode;
import
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
@@ -250,7 +251,7 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>)
select.getQuery(options, 0);
if (selectQuery.queries.size() != 1)
- throw new IllegalArgumentException("Within a transaction, SELECT
statements must select a single partition; found " + selectQuery.queries.size()
+ " partitions");
+ throw invalidRequest("Within a transaction, SELECT statements must
select a single partition; found " + selectQuery.queries.size() + "
partitions");
SinglePartitionReadCommand command =
Iterables.getOnlyElement(selectQuery.queries);
return new TxnNamedRead(namedSelect.name,
keyCollector.collect(command.metadata(), command.partitionKey()), command,
keyCollector.tables);
@@ -560,6 +561,8 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
TxnResult txnResult = AccordService.instance().coordinate(minEpoch,
txn, options.getConsistency(), requestTime);
if (txnResult.kind() == retry_new_protocol)
throw new InvalidRequestException(UNSUPPORTED_MIGRATION);
+ TxnValidationRejection.maybeThrow(txnResult);
+
TxnData data = (TxnData)txnResult;
if (returningSelect != null)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 40b3d1fe82..b1ac0bfffa 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -148,6 +148,7 @@ import org.apache.cassandra.service.accord.txn.TxnQuery;
import org.apache.cassandra.service.accord.txn.TxnRangeReadResult;
import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.service.accord.txn.TxnResult;
+import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
import org.apache.cassandra.service.consensus.TransactionalMode;
import
org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
@@ -1316,13 +1317,16 @@ public class StorageProxy implements StorageProxyMBean
{
if (accordResult != null)
{
- TxnResult.Kind kind =
accordResult.awaitAndGet().kind();
+ TxnResult result = accordResult.awaitAndGet();
+ TxnResult.Kind kind = result.kind();
if (kind == retry_new_protocol && failure == null)
{
Tracing.trace("Accord returned retry new
protocol");
logger.debug("Retrying mutations on different
system because some mutations were misrouted according to Accord");
continue;
}
+ TxnValidationRejection.maybeThrow(result);
+
Tracing.trace("Successfully wrote Accord mutations");
}
}
@@ -1549,9 +1553,11 @@ public class StorageProxy implements StorageProxyMBean
// the batch log.
if (accordResult != null)
{
- TxnResult.Kind kind =
accordResult.awaitAndGet().kind();
+ TxnResult result = accordResult.awaitAndGet();
+ TxnResult.Kind kind = result.kind();
if (kind == retry_new_protocol && failure == null)
continue;
+ TxnValidationRejection.maybeThrow(result);
Tracing.trace("Successfully wrote Accord mutations");
cleanup.ackMutation();
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java
index 3fc9c08198..cacafcd616 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java
@@ -42,6 +42,7 @@ public interface TxnDataValue
{
key(0),
range(1);
+
int id;
Kind(int id)
@@ -49,14 +50,15 @@ public interface TxnDataValue
this.id = id;
}
- public TxnDataValueSerializer serializer()
+ @SuppressWarnings("unchecked")
+ public <T extends TxnDataValue> IVersionedSerializer<T> serializer()
{
switch (this)
{
case key:
- return TxnDataKeyValue.serializer;
+ return (IVersionedSerializer<T>)
TxnDataKeyValue.serializer;
case range:
- return TxnDataRangeValue.serializer;
+ return (IVersionedSerializer<T>)
TxnDataRangeValue.serializer;
default:
throw new IllegalStateException("Unrecognized kind " +
this);
}
@@ -71,7 +73,7 @@ public interface TxnDataValue
long estimatedSizeOnHeap();
- IVersionedSerializer<TxnDataValue> serializer = new
IVersionedSerializer<TxnDataValue>()
+ IVersionedSerializer<TxnDataValue> serializer = new
IVersionedSerializer<>()
{
@SuppressWarnings("unchecked")
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
index e380b87fe6..191518e34a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
@@ -154,6 +154,9 @@ public abstract class TxnQuery implements Query
@Override
public Result compute(TxnId txnId, Timestamp executeAt, Seekables<?,
?> keys, @Nullable Data data, @Nullable Read read, @Nullable Update update)
{
+ TxnValidationRejection rejection =
TxnUpdate.validationRejection(update);
+ if (rejection != null) return rejection;
+
// Skip the migration checks in the base class for empty
transactions, we don't
// want/need the RetryWithNewProtocolResult
return new TxnData();
@@ -207,6 +210,9 @@ public abstract class TxnQuery implements Query
@Override
public Result compute(TxnId txnId, Timestamp executeAt, Seekables<?, ?>
keys, @Nullable Data data, @Nullable Read read, @Nullable Update update)
{
+ TxnValidationRejection rejection =
TxnUpdate.validationRejection(update);
+ if (rejection != null) return rejection;
+
// TODO (required): This is not the cluster metadata of the current
transaction
ClusterMetadata clusterMetadata = ClusterMetadata.current();
checkState(clusterMetadata.epoch.getEpoch() >= executeAt.epoch(), "TCM
epoch %d is < executeAt epoch %d", clusterMetadata.epoch.getEpoch(),
executeAt.epoch());
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
index c453f612b2..f997008210 100644
---
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
+++
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
@@ -24,6 +24,10 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.cql3.FieldIdentifier;
import org.apache.cassandra.cql3.Operation;
import org.apache.cassandra.cql3.UpdateParameters;
@@ -35,8 +39,10 @@ import org.apache.cassandra.cql3.terms.Sets;
import org.apache.cassandra.cql3.terms.Term;
import org.apache.cassandra.cql3.terms.UserTypes;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
@@ -58,25 +64,29 @@ import static
org.apache.cassandra.service.accord.AccordSerializers.columnMetada
public class TxnReferenceOperation
{
private static final Map<Class<? extends Operation>, Kind>
operationKindMap = initOperationKindMap();
-
- private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+
+ @VisibleForTesting
+ static Map<Class<? extends Operation>, Kind> initOperationKindMap()
{
Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
- temp.put(Sets.Adder.class, Kind.SetAdder);
temp.put(Constants.Adder.class, Kind.ConstantAdder);
+ temp.put(Constants.Setter.class, Kind.ConstantSetter);
+ temp.put(Constants.Substracter.class, Kind.ConstantSubtracter);
temp.put(Lists.Appender.class, Kind.ListAppender);
- temp.put(Sets.Discarder.class, Kind.SetDiscarder);
temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+ temp.put(Lists.DiscarderByIndex.class, Kind.ListDiscarderByIndex);
temp.put(Lists.Prepender.class, Kind.ListPrepender);
- temp.put(Maps.Putter.class, Kind.MapPutter);
temp.put(Lists.Setter.class, Kind.ListSetter);
- temp.put(Sets.Setter.class, Kind.SetSetter);
+ temp.put(Lists.SetterByIndex.class, Kind.ListSetterByIndex);
+ temp.put(Maps.DiscarderByKey.class, Kind.MapDiscarderByKey);
+ temp.put(Maps.Putter.class, Kind.MapPutter);
temp.put(Maps.Setter.class, Kind.MapSetter);
- temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
- temp.put(Constants.Setter.class, Kind.ConstantSetter);
- temp.put(Constants.Substracter.class, Kind.ConstantSubtracter);
temp.put(Maps.SetterByKey.class, Kind.MapSetterByKey);
- temp.put(Lists.SetterByIndex.class, Kind.ListSetterByIndex);
+ temp.put(Sets.Adder.class, Kind.SetAdder);
+ temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+ temp.put(Sets.ElementDiscarder.class, Kind.SetElementDiscarder);
+ temp.put(Sets.Setter.class, Kind.SetSetter);
+ temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
temp.put(UserTypes.SetterByField.class, Kind.UserTypeSetterByField);
return temp;
}
@@ -103,7 +113,10 @@ public class TxnReferenceOperation
ConstantSubtracter((byte) 13, (column, keyOrIndex, field, value) ->
new Constants.Substracter(column, value)),
MapSetterByKey((byte) 14, (column, keyOrIndex, field, value) -> new
Maps.SetterByKey(column, keyOrIndex, value)),
ListSetterByIndex((byte) 15, (column, keyOrIndex, field, value) -> new
Lists.SetterByIndex(column, keyOrIndex, value)),
- UserTypeSetterByField((byte) 16, (column, keyOrIndex, field, value) ->
new UserTypes.SetterByField(column, field, value));
+ UserTypeSetterByField((byte) 16, (column, keyOrIndex, field, value) ->
new UserTypes.SetterByField(column, field, value)),
+ ListDiscarderByIndex((byte) 17, (column, keyOrIndex, field, value) ->
new Lists.DiscarderByIndex(column, value)),
+ MapDiscarderByKey((byte) 18, (column, keyOrIndex, field, value) -> new
Maps.DiscarderByKey(column, value)),
+ SetElementDiscarder((byte) 19, (column, keyOrIndex, field, value) ->
new Sets.ElementDiscarder(column, value));
private final byte id;
private final ToOperation toOperation;
@@ -154,18 +167,20 @@ public class TxnReferenceOperation
private final Kind kind;
private final ColumnMetadata receiver;
- private final TableMetadata table;
- private final ByteBuffer key;
- private final ByteBuffer field;
+ public final TableMetadata table;
+ private final @Nullable ByteBuffer keyOrIndex;
+ private final @Nullable ByteBuffer field;
private final TxnReferenceValue value;
+ private final @Nullable AbstractType<?> keyOrIndexType;
private final AbstractType<?> valueType;
- public TxnReferenceOperation(Kind kind, ColumnMetadata receiver,
TableMetadata table, ByteBuffer key, ByteBuffer field, TxnReferenceValue value)
+ public TxnReferenceOperation(Kind kind, ColumnMetadata receiver,
TableMetadata table,
+ @Nullable ByteBuffer keyOrIndex, @Nullable
ByteBuffer field, TxnReferenceValue value)
{
this.kind = kind;
this.receiver = receiver;
this.table = table;
- this.key = key;
+ this.keyOrIndex = keyOrIndex;
this.field = field;
// We don't expect operators on clustering keys, but unwrap just in
case.
@@ -175,20 +190,36 @@ public class TxnReferenceOperation
{
// The value for a map subtraction is actually a set (see
Operation.Substraction)
this.valueType = SetType.getInstance(((MapType<?, ?>)
receiverType).getKeysType(), true);
+ this.keyOrIndexType = null;
+ }
+ else if (kind == Kind.MapDiscarderByKey || kind ==
Kind.SetElementDiscarder)
+ {
+ CollectionType<?> ct = (CollectionType<?>) receiverType;
+ this.keyOrIndexType = null;
+ this.valueType = ct.nameComparator();
}
else if (kind == Kind.MapSetterByKey || kind == Kind.ListSetterByIndex)
{
- this.valueType = ((CollectionType<?>)
receiverType).valueComparator();
+ CollectionType<?> ct = (CollectionType<?>) receiverType;
+ this.keyOrIndexType = ct.nameComparator();
+ this.valueType = ct.valueComparator();
+ }
+ else if (kind == Kind.ListDiscarderByIndex)
+ {
+ this.valueType = Int32Type.instance;
+ this.keyOrIndexType = null;
}
else if (kind == Kind.UserTypeSetterByField)
{
UserType userType = (UserType) receiverType;
CellPath fieldPath = userType.cellPathForField(new
FieldIdentifier(field));
this.valueType = userType.fieldType(fieldPath);
+ this.keyOrIndexType = null;
}
else
{
this.valueType = receiverType;
+ this.keyOrIndexType = null;
}
this.value = value;
@@ -202,27 +233,35 @@ public class TxnReferenceOperation
TxnReferenceOperation that = (TxnReferenceOperation) o;
return Objects.equals(receiver, that.receiver)
&& kind == that.kind
- && Objects.equals(key, that.key)
+ && Objects.equals(keyOrIndex, that.keyOrIndex)
&& Objects.equals(field, that.field)
&& Objects.equals(value, that.value);
}
- public void collect(TableMetadatas.Collector collector)
- {
- collector.add(table);
- value.collect(collector);
- }
-
@Override
public int hashCode()
{
- return Objects.hash(receiver, kind, key, field, value);
+ return Objects.hash(receiver, kind, keyOrIndex, field, value);
}
+
@Override
public String toString()
{
- return receiver + " = " + value;
+ return "TxnReferenceOperation{" +
+ "kind=" + kind +
+ ", receiver=" + receiver +
+ ", table=" + table +
+ ", key=" + keyOrIndex +
+ ", field=" + field +
+ ", value=" + value +
+ '}';
+ }
+
+ public void collect(TableMetadatas.Collector collector)
+ {
+ collector.add(table);
+ value.collect(collector);
}
public ColumnMetadata receiver()
@@ -236,11 +275,12 @@ public class TxnReferenceOperation
operation.execute(key, up);
}
- private Operation toOperation(TxnData data)
+ @VisibleForTesting
+ Operation toOperation(TxnData data)
{
FieldIdentifier fieldIdentifier = field == null ? null : new
FieldIdentifier(field);
Term valueTerm = toTerm(data, valueType);
- Term keyorIndexTerm = key == null ? null : toTerm(key, valueType);
+ Term keyorIndexTerm = keyOrIndex == null ? null : toTerm(keyOrIndex,
keyOrIndexType);
return kind.toOperation(receiver, keyorIndexTerm, fieldIdentifier,
valueTerm);
}
@@ -274,9 +314,9 @@ public class TxnReferenceOperation
columnMetadataSerializer.serialize(operation.receiver,
operation.table, out);
TxnReferenceValue.serializer.serialize(operation.value, tables,
out);
- out.writeBoolean(operation.key != null);
- if (operation.key != null)
- ByteBufferUtil.writeWithVIntLength(operation.key, out);
+ out.writeBoolean(operation.keyOrIndex != null);
+ if (operation.keyOrIndex != null)
+ ByteBufferUtil.writeWithVIntLength(operation.keyOrIndex, out);
out.writeBoolean(operation.field != null);
if (operation.field != null)
@@ -303,9 +343,11 @@ public class TxnReferenceOperation
size +=
columnMetadataSerializer.serializedSize(operation.receiver, operation.table);
size +=
TxnReferenceValue.serializer.serializedSize(operation.value, tables);
- if (operation.key != null)
- size +=
ByteBufferUtil.serializedSizeWithVIntLength(operation.key);
+ size += TypeSizes.sizeof(operation.keyOrIndex != null);
+ if (operation.keyOrIndex != null)
+ size +=
ByteBufferUtil.serializedSizeWithVIntLength(operation.keyOrIndex);
+ size += TypeSizes.sizeof(operation.field != null);
if (operation.field != null)
size +=
ByteBufferUtil.serializedSizeWithVIntLength(operation.field);
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
index 2b5af7c08b..b3bf2b3fc8 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
@@ -26,7 +26,8 @@ public interface TxnResult extends Result
{
txn_data(0),
retry_new_protocol(1),
- range_read(2);
+ range_read(2),
+ validation_rejection(3);
int id;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index d4b549a61d..0d3def4fd0 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -45,6 +45,7 @@ import accord.utils.SortedArrays;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
import org.apache.cassandra.io.UnversionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
@@ -85,7 +86,7 @@ import static
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
import static
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
-public class TxnUpdate extends AccordUpdate
+public final class TxnUpdate extends AccordUpdate
{
static class ConditionalBlock
{
@@ -547,6 +548,9 @@ public class TxnUpdate extends AccordUpdate
// Memoize computation of condition
private Boolean anyConditionResult;
+ @Nullable
+ private transient volatile Object validationException = this;
+
public TxnUpdate(TableMetadatas tables, List<Fragment> fragments,
TxnCondition condition, @Nullable ConsistencyLevel cassandraCommitCL,
PreserveTimestamp preserveTimestamps)
{
requireArgument(cassandraCommitCL == null ||
IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(cassandraCommitCL));
@@ -584,6 +588,25 @@ public class TxnUpdate extends AccordUpdate
return new TxnUpdate(TableMetadatas.none(), Keys.EMPTY,
Collections.emptyList(), null, PreserveTimestamp.no);
}
+ public static TxnValidationRejection validationRejection(@Nullable Update
update)
+ {
+ if (update != null && update.getClass() == TxnUpdate.class)
+ {
+ RequestValidationException e = ((TxnUpdate)
update).validationRejection();
+ if (e != null) return new TxnValidationRejection(e);
+ }
+ return null;
+ }
+
+ @Nullable
+ private RequestValidationException validationRejection()
+ {
+ Object snapshot = validationException;
+ if (snapshot == this)
+ throw Invariants.illegalState("Attempted to check for validation
exception before .apply was called");
+ return snapshot == null ? null : (RequestValidationException) snapshot;
+ }
+
@Override
public long estimatedSizeOnHeap()
{
@@ -677,7 +700,18 @@ public class TxnUpdate extends AccordUpdate
ClusterMetadata cm = ClusterMetadata.current();
checkState(cm.epoch.getEpoch() >= executeAt.epoch(), "TCM epoch %d is
< executeAt epoch %d", cm.epoch.getEpoch(), executeAt.epoch());
- Pair<List<TxnWrite.Update>, SimpleBitSet> pair =
processCondition(executeAt, data);
+ Pair<List<TxnWrite.Update>, SimpleBitSet> pair = null;
+ try
+ {
+ validationException = null;
+ pair = processCondition(executeAt, data);
+ }
+ catch (RequestValidationException e)
+ {
+ // the update isn't allowed
+ validationException = e;
+ }
+
if (pair == null)
return new TxnWrite(TableMetadatas.none(),
Collections.emptyList(), SimpleBitSets.allUnset(numConditionalBlocks()));
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnValidationRejection.java
similarity index 51%
copy from src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
copy to
src/java/org/apache/cassandra/service/accord/txn/TxnValidationRejection.java
index 2b5af7c08b..aca6fbfda3 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
+++
b/src/java/org/apache/cassandra/service/accord/txn/TxnValidationRejection.java
@@ -18,25 +18,36 @@
package org.apache.cassandra.service.accord.txn;
-import accord.api.Result;
+import java.util.Objects;
-public interface TxnResult extends Result
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+import static
org.apache.cassandra.service.accord.txn.TxnResult.Kind.validation_rejection;
+
+public class TxnValidationRejection implements TxnResult
{
- enum Kind
- {
- txn_data(0),
- retry_new_protocol(1),
- range_read(2);
+ private final RequestValidationException validationException;
- int id;
+ public TxnValidationRejection(RequestValidationException
validationException)
+ {
+ this.validationException = Objects.requireNonNull(validationException);
+ }
- Kind(int id)
- {
- this.id = id;
- }
+ public static void maybeThrow(TxnResult txnResult)
+ {
+ if (txnResult.kind() == validation_rejection)
+ throw ((TxnValidationRejection) txnResult).validationException;
}
- Kind kind();
+ @Override
+ public Kind kind()
+ {
+ return Kind.validation_rejection;
+ }
- long estimatedSizeOnHeap();
+ @Override
+ public long estimatedSizeOnHeap()
+ {
+ return 0;
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index bdee5765ac..f3c7fceede 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -3416,4 +3416,20 @@ public abstract class AccordCQLTestBase extends
AccordTestBase
}
});
}
+
+ @Test
+ public void userSeesInvalidRejection() throws Exception
+ {
+ var expectedType =
AssertionUtils.isInstanceof(InvalidRequestException.class);
+ test("CREATE TABLE " + qualifiedAccordTableName + "(k int PRIMARY KEY,
l list<int>) WITH " + transactionalMode.asCqlParam(), cluster -> {
+ String cql = "UPDATE " + qualifiedAccordTableName + " SET l[0] =
42 WHERE k=42";
+ Assertions.assertThatThrownBy(() ->
cluster.coordinator(1).execute(cql, QUORUM))
+ .is(expectedType)
+ .hasMessage("Attempted to set an element on a list which
is null");
+
+ Assertions.assertThatThrownBy(() ->
cluster.coordinator(1).execute(wrapInTxn(cql), QUORUM))
+ .is(expectedType)
+ .hasMessage("Attempted to set an element on a list which
is null");
+ });
+ }
}
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
index 383109573c..1c9c361c4b 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
@@ -348,7 +348,7 @@ public class BytesPartitionState
sb.append(partition);
sb.append(", clustering=");
if (clustering == null) sb.append("null");
- else appendValues(sb,
factory.clusteringColumns, clustering);
+ else appendValues(sb, clustering ==
Clustering.STATIC_CLUSTERING ? List.of() : factory.clusteringColumns,
clustering);
sb.append(')');
return sb.toString();
}
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
b/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
index 57ec2ad4af..6c426e8fcc 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
@@ -410,6 +410,12 @@ public class PartitionState implements
Iterable<PartitionState.RowState>
", lts(" + StringUtils.toString(lts) + ")";
}
}
+
+ @Override
+ public String toString()
+ {
+ return toString(partitionState.valueGenerators);
+ }
}
public static long[] arr(int length, long fill)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/CollectionAccess.java
b/test/unit/org/apache/cassandra/cql3/ast/CollectionAccess.java
index d43d7bf34f..fca80da516 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/CollectionAccess.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/CollectionAccess.java
@@ -35,6 +35,18 @@ public class CollectionAccess implements ReferenceExpression
this.type = type;
}
+ @Override
+ public ReferenceExpression visit(Visitor v)
+ {
+ var u = v.visit(this);
+ if (u != this) return u;
+ var col = column.visit(v);
+ var val = element.visit(v);
+ if (col != column || val != element)
+ return new CollectionAccess(col, val, type);
+ return this;
+ }
+
@Override
public void toCQL(StringBuilder sb, CQLFormatter formatter)
{
diff --git
a/test/unit/org/apache/cassandra/service/accord/txn/TxnReferenceOperationTest.java
b/test/unit/org/apache/cassandra/service/accord/txn/TxnReferenceOperationTest.java
new file mode 100644
index 0000000000..5515221815
--- /dev/null
+++
b/test/unit/org/apache/cassandra/service/accord/txn/TxnReferenceOperationTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import accord.utils.Gen;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.terms.Constants;
+import org.apache.cassandra.cql3.terms.UserTypes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.Serializers;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.serializers.TableMetadatas;
+import org.apache.cassandra.utils.AbstractTypeGenerators;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Generators;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
+import org.reflections.util.ConfigurationBuilder;
+
+import static accord.utils.Property.qt;
+
+public class TxnReferenceOperationTest
+{
+ private static final String KS = "ks";
+ private static final TxnData EMPTY = new TxnData();
+
+ @Test
+ public void coverage()
+ {
+ Reflections reflections = new Reflections(new ConfigurationBuilder()
+
.forPackage("org.apache.cassandra")
+
.setScanners(Scanners.SubTypes)
+ .setExpandSuperTypes(true)
+ .setParallel(true));
+ var subTypes = reflections.getSubTypesOf(Operation.class);
+ var knownTypes = TxnReferenceOperation.initOperationKindMap().keySet();
+ // these types do not have a way to define a reference, as they are
column level:
+ Set<Class<? extends Operation>> safeToExclude =
ImmutableSet.of(Constants.Deleter.class, // DELETE foo
+
UserTypes.DeleterByField.class // DELETE foo.bar
+ );
+
+ StringBuilder sb = null;
+ for (var klass : Sets.difference(subTypes, Sets.union(knownTypes,
safeToExclude)))
+ {
+ if (sb == null)
+ sb = new StringBuilder();
+ sb.append(klass.getCanonicalName()).append('\n');
+ }
+ if (sb != null)
+ throw new AssertionError(sb.toString());
+ }
+
+ @Test
+ public void serde()
+ {
+ @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed"
}) DataOutputBuffer output = new DataOutputBuffer();
+ qt().withExamples(10_000).forAll(gen()).check(txnOp -> {
+ Serializers.testSerde(output, TxnReferenceOperation.serializer,
txnOp, TableMetadatas.of(txnOp.table));
+
+ txnOp.toOperation(EMPTY);
+ });
+ }
+
+ private enum Group
+ {
+ Setter, SetterByIndex, SetterByKey, SetterByField,
+ Adder, Subtracter,
+ Appender, Putter, Discarder, Prepender,
+ }
+
+ private static Gen<TxnReferenceValue> valueGen(AbstractType<?> type)
+ {
+ return Generators.toGen(AbstractTypeGenerators.getTypeSupport(type)
+ .bytesGen())
+ .map(TxnReferenceValue.Constant::new);
+ }
+
+ private static Gen<TxnReferenceOperation> gen()
+ {
+ return rs -> {
+ TxnReferenceOperation.Kind kind;
+ ColumnMetadata receiver;
+ TableMetadata table;
+ @Nullable ByteBuffer keyOrIndex = null;
+ @Nullable ByteBuffer field = null;
+ TxnReferenceValue value;
+ Group group = rs.pick(Group.values());
+ switch (group)
+ {
+ case Prepender:
+ {
+ kind = TxnReferenceOperation.Kind.ListPrepender;
+ ListType<?> type =
ListType.getInstance(Int32Type.instance, true);
+ value = valueGen(type).next(rs);
+
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ }
+ break;
+ case Discarder:
+ {
+ CollectionType<?> type = (CollectionType<?>)
Generators.toGen(AbstractTypeGenerators.builder()
+
.withMaxDepth(1)
+
.withTypeKinds(AbstractTypeGenerators.TypeKind.LIST,
+
AbstractTypeGenerators.TypeKind.SET,
+
AbstractTypeGenerators.TypeKind.MAP)
+
.build())
+
.next(rs);
+ kind = type instanceof ListType
+ ? TxnReferenceOperation.Kind.ListDiscarder
+ : TxnReferenceOperation.Kind.SetDiscarder;
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ if (kind == TxnReferenceOperation.Kind.ListDiscarder &&
rs.nextBoolean())
+ {
+ kind = TxnReferenceOperation.Kind.ListDiscarderByIndex;
+ value = new
TxnReferenceValue.Constant(Int32Type.instance.decompose(42));
+ }
+ else if (type instanceof MapType && rs.nextBoolean())
+ {
+ kind = TxnReferenceOperation.Kind.MapDiscarderByKey;
+ var keyType = ((MapType<?, ?>) type).getKeysType();
+ value = valueGen(keyType).next(rs);
+ }
+ else if (type instanceof SetType && rs.nextBoolean())
+ {
+ kind = TxnReferenceOperation.Kind.SetElementDiscarder;
+ var elementType = ((SetType<?>)
type).getElementsType();
+ value = valueGen(elementType).next(rs);
+ }
+ else
+ {
+ CollectionType<?> discardType = type instanceof MapType
+ ?
SetType.getInstance(((MapType<?, ?>) type).getKeysType(), true)
+ : type;
+ value = valueGen(discardType).next(rs);
+ }
+ }
+ break;
+ case Adder:
+ case Subtracter:
+ {
+ if (group == Group.Adder && rs.nextBoolean())
+ {
+ // this is similar to ListAppend
+ var type = SetType.getInstance(Int32Type.instance,
true);
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ value = valueGen(type).next(rs);
+ kind = TxnReferenceOperation.Kind.SetAdder;
+ }
+ else
+ {
+ var type = Int32Type.instance;
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ value = valueGen(type).next(rs);
+ kind = group == Group.Adder ?
TxnReferenceOperation.Kind.ConstantAdder :
TxnReferenceOperation.Kind.ConstantSubtracter;
+ }
+ }
+ break;
+ case Setter:
+ {
+ var type =
Generators.toGen(AbstractTypeGenerators.builder()
+
.withoutUnsafeEquality()
+
.withMaxDepth(1)
+ .build())
+ .next(rs);
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ value = valueGen(type).next(rs);
+ if (type instanceof ListType)
+ kind = TxnReferenceOperation.Kind.ListSetter;
+ else if (type instanceof SetType)
+ kind = TxnReferenceOperation.Kind.SetSetter;
+ else if (type instanceof MapType)
+ kind = TxnReferenceOperation.Kind.MapSetter;
+ else if (type instanceof UserType)
+ kind = TxnReferenceOperation.Kind.UserTypeSetter;
+ else
+ kind = TxnReferenceOperation.Kind.ConstantSetter;
+ }
+ break;
+ case SetterByIndex:
+ {
+ ListType<String> type =
ListType.getInstance(UTF8Type.instance, true);
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ value = valueGen(type.getElementsType()).next(rs);
+ kind = TxnReferenceOperation.Kind.ListSetterByIndex;//
x[?] = ?
+ keyOrIndex = Int32Type.instance.decompose(42);
+ }
+ break;
+ case Appender:
+ {
+ ListType<String> type =
ListType.getInstance(UTF8Type.instance, true);
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ value = valueGen(type).next(rs);
+ kind = TxnReferenceOperation.Kind.ListAppender;
+ }
+ break;
+ case SetterByKey:
+ {
+ MapType<Integer, String> type =
MapType.getInstance(Int32Type.instance, UTF8Type.instance, true);
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ value = valueGen(type.getValuesType()).next(rs);
+ kind = TxnReferenceOperation.Kind.MapSetterByKey;
+ keyOrIndex = Int32Type.instance.decompose(42);
+ }
+ break;
+ case Putter: // x += {foo: bar, baz: biz} -- basically
Appender but for Map!
+ {
+ MapType<Integer, String> type =
MapType.getInstance(Int32Type.instance, UTF8Type.instance, true);
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ value = valueGen(type).next(rs);
+ kind = TxnReferenceOperation.Kind.MapPutter;
+ }
+ break;
+ case SetterByField:
+ {
+ UserType type = new UserType(KS,
ByteBufferUtil.bytes("udt"),
+
List.of(FieldIdentifier.forUnquoted("f1")),
+ List.of(UTF8Type.instance),
+ true);
+ kind = TxnReferenceOperation.Kind.UserTypeSetterByField;
+ table = table(type);
+ receiver =
table.getColumn(ColumnIdentifier.getInterned("col", true));
+ value = valueGen(UTF8Type.instance).next(rs);
+ field = FieldIdentifier.forUnquoted("f1").bytes;
+ }
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+ return new TxnReferenceOperation(kind, receiver, table,
keyOrIndex, field, value);
+ };
+ }
+
+ private static TableMetadata table(AbstractType<?> type)
+ {
+ return TableMetadata.builder(KS, "tbl")
+ .partitioner(Murmur3Partitioner.instance)
+ .addPartitionKeyColumn("pk", Int32Type.instance)
+ .addRegularColumn("col", type)
+ .build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]