This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9c8ab8809d Accord: write rejections would be returned to users as 
server errors rather than INVALID and TxnReferenceOperation didn't handle all 
collections prperly
9c8ab8809d is described below

commit 9c8ab8809d618f6d34316d131accef9ecd4b29a9
Author: David Capwell <[email protected]>
AuthorDate: Mon Dec 15 12:59:52 2025 -0800

    Accord: write rejections would be returned to users as server errors rather 
than INVALID and TxnReferenceOperation didn't handle all collections prperly
    
    patch by David Capwell; reviewed by Benedict Elliott Smith, Caleb 
Rackliffe, Jyothsna Konisa for CASSANDRA-21061
---
 CHANGES.txt                                        |   1 +
 modules/accord                                     |   2 +-
 .../cassandra/cql3/statements/CQL3CasRequest.java  |   2 +
 .../cql3/statements/TransactionStatement.java      |   5 +-
 .../org/apache/cassandra/service/StorageProxy.java |  10 +-
 .../cassandra/service/accord/txn/TxnDataValue.java |  10 +-
 .../cassandra/service/accord/txn/TxnQuery.java     |   6 +
 .../service/accord/txn/TxnReferenceOperation.java  | 108 +++++---
 .../cassandra/service/accord/txn/TxnResult.java    |   3 +-
 .../cassandra/service/accord/txn/TxnUpdate.java    |  38 ++-
 ...{TxnResult.java => TxnValidationRejection.java} |  39 ++-
 .../distributed/test/accord/AccordCQLTestBase.java |  16 ++
 .../cassandra/harry/model/BytesPartitionState.java |   2 +-
 .../cassandra/harry/model/PartitionState.java      |   6 +
 .../cassandra/cql3/ast/CollectionAccess.java       |  12 +
 .../accord/txn/TxnReferenceOperationTest.java      | 289 +++++++++++++++++++++
 16 files changed, 490 insertions(+), 59 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9cac450533..ac47a545f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Accord: write rejections would be returned to users as server errors rather 
than INVALID and TxnReferenceOperation didn't handle all collections prperly 
(CASSANDRA-21061)
  * Use byte[] directly in QueryOptions instead of ByteBuffer and convert them 
to ArrayCell instead of BufferCell to reduce allocations (CASSANDRA-20166)
  * Log queries scanning too many SSTables per read (CASSANDRA-21048)
  * Extend nodetool verify to (optionally) validate SAI files (CASSANDRA-20949)
diff --git a/modules/accord b/modules/accord
index 6bae51f6a4..f6b0a6998f 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 6bae51f6a4bd560a82840fa0809cd1d630696cc0
+Subproject commit f6b0a6998faca767e6951976097dec704c306b0e
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 080a705ffa..9cc2b94c66 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -74,6 +74,7 @@ import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.service.accord.txn.TxnReference;
 import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
 import org.apache.cassandra.service.accord.txn.TxnWrite;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.tcm.ClusterMetadata;
@@ -574,6 +575,7 @@ public class CQL3CasRequest implements CASRequest
     {
         if (txnResult.kind() == retry_new_protocol)
             return RETRY_NEW_PROTOCOL;
+        TxnValidationRejection.maybeThrow(txnResult);
         TxnData txnData = (TxnData)txnResult;
         TxnDataKeyValue partition = 
(TxnDataKeyValue)txnData.get(txnDataName(CAS_READ));
         return casResult(partition != null ? partition.rowIterator(false) : 
null);
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index caff467cd7..97482d994d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -85,6 +85,7 @@ import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.service.accord.txn.TxnReference;
 import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
 import org.apache.cassandra.service.accord.txn.TxnWrite;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 import 
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
@@ -250,7 +251,7 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
         SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery 
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) 
select.getQuery(options, 0);
 
         if (selectQuery.queries.size() != 1)
-            throw new IllegalArgumentException("Within a transaction, SELECT 
statements must select a single partition; found " + selectQuery.queries.size() 
+ " partitions");
+            throw invalidRequest("Within a transaction, SELECT statements must 
select a single partition; found " + selectQuery.queries.size() + " 
partitions");
 
         SinglePartitionReadCommand command = 
Iterables.getOnlyElement(selectQuery.queries);
         return new TxnNamedRead(namedSelect.name, 
keyCollector.collect(command.metadata(), command.partitionKey()), command, 
keyCollector.tables);
@@ -560,6 +561,8 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
         TxnResult txnResult = AccordService.instance().coordinate(minEpoch, 
txn, options.getConsistency(), requestTime);
         if (txnResult.kind() == retry_new_protocol)
             throw new InvalidRequestException(UNSUPPORTED_MIGRATION);
+        TxnValidationRejection.maybeThrow(txnResult);
+
         TxnData data = (TxnData)txnResult;
 
         if (returningSelect != null)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 40b3d1fe82..b1ac0bfffa 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -148,6 +148,7 @@ import org.apache.cassandra.service.accord.txn.TxnQuery;
 import org.apache.cassandra.service.accord.txn.TxnRangeReadResult;
 import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.service.accord.txn.TxnResult;
+import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 import 
org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
@@ -1316,13 +1317,16 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     if (accordResult != null)
                     {
-                        TxnResult.Kind kind = 
accordResult.awaitAndGet().kind();
+                        TxnResult result = accordResult.awaitAndGet();
+                        TxnResult.Kind kind = result.kind();
                         if (kind == retry_new_protocol && failure == null)
                         {
                             Tracing.trace("Accord returned retry new 
protocol");
                             logger.debug("Retrying mutations on different 
system because some mutations were misrouted according to Accord");
                             continue;
                         }
+                        TxnValidationRejection.maybeThrow(result);
+
                         Tracing.trace("Successfully wrote Accord mutations");
                     }
                 }
@@ -1549,9 +1553,11 @@ public class StorageProxy implements StorageProxyMBean
                     // the batch log.
                     if (accordResult != null)
                     {
-                        TxnResult.Kind kind = 
accordResult.awaitAndGet().kind();
+                        TxnResult result = accordResult.awaitAndGet();
+                        TxnResult.Kind kind = result.kind();
                         if (kind == retry_new_protocol && failure == null)
                             continue;
+                        TxnValidationRejection.maybeThrow(result);
                         Tracing.trace("Successfully wrote Accord mutations");
                         cleanup.ackMutation();
                     }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java
index 3fc9c08198..cacafcd616 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java
@@ -42,6 +42,7 @@ public interface TxnDataValue
     {
         key(0),
         range(1);
+
         int id;
 
         Kind(int id)
@@ -49,14 +50,15 @@ public interface TxnDataValue
             this.id = id;
         }
 
-        public TxnDataValueSerializer serializer()
+        @SuppressWarnings("unchecked")
+        public <T extends TxnDataValue> IVersionedSerializer<T> serializer()
         {
             switch (this)
             {
                 case key:
-                    return TxnDataKeyValue.serializer;
+                    return (IVersionedSerializer<T>) 
TxnDataKeyValue.serializer;
                 case range:
-                    return TxnDataRangeValue.serializer;
+                    return (IVersionedSerializer<T>) 
TxnDataRangeValue.serializer;
                 default:
                     throw new IllegalStateException("Unrecognized kind " + 
this);
             }
@@ -71,7 +73,7 @@ public interface TxnDataValue
 
     long estimatedSizeOnHeap();
 
-    IVersionedSerializer<TxnDataValue> serializer = new 
IVersionedSerializer<TxnDataValue>()
+    IVersionedSerializer<TxnDataValue> serializer = new 
IVersionedSerializer<>()
     {
         @SuppressWarnings("unchecked")
         @Override
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
index e380b87fe6..191518e34a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
@@ -154,6 +154,9 @@ public abstract class TxnQuery implements Query
         @Override
         public Result compute(TxnId txnId, Timestamp executeAt, Seekables<?, 
?> keys, @Nullable Data data, @Nullable Read read, @Nullable Update update)
         {
+            TxnValidationRejection rejection = 
TxnUpdate.validationRejection(update);
+            if (rejection != null) return rejection;
+
             // Skip the migration checks in the base class for empty 
transactions, we don't
             // want/need the RetryWithNewProtocolResult
             return new TxnData();
@@ -207,6 +210,9 @@ public abstract class TxnQuery implements Query
     @Override
     public Result compute(TxnId txnId, Timestamp executeAt, Seekables<?, ?> 
keys, @Nullable Data data, @Nullable Read read, @Nullable Update update)
     {
+        TxnValidationRejection rejection = 
TxnUpdate.validationRejection(update);
+        if (rejection != null) return rejection;
+
         // TODO (required): This is not the cluster metadata of the current 
transaction
         ClusterMetadata clusterMetadata = ClusterMetadata.current();
         checkState(clusterMetadata.epoch.getEpoch() >= executeAt.epoch(), "TCM 
epoch %d is < executeAt epoch %d", clusterMetadata.epoch.getEpoch(), 
executeAt.epoch());
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
index c453f612b2..f997008210 100644
--- 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
@@ -24,6 +24,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.cql3.FieldIdentifier;
 import org.apache.cassandra.cql3.Operation;
 import org.apache.cassandra.cql3.UpdateParameters;
@@ -35,8 +39,10 @@ import org.apache.cassandra.cql3.terms.Sets;
 import org.apache.cassandra.cql3.terms.Term;
 import org.apache.cassandra.cql3.terms.UserTypes;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
@@ -58,25 +64,29 @@ import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetada
 public class TxnReferenceOperation
 {
     private static final Map<Class<? extends Operation>, Kind> 
operationKindMap = initOperationKindMap();
-    
-    private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+
+    @VisibleForTesting
+    static Map<Class<? extends Operation>, Kind> initOperationKindMap()
     {
         Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
-        temp.put(Sets.Adder.class, Kind.SetAdder);
         temp.put(Constants.Adder.class, Kind.ConstantAdder);
+        temp.put(Constants.Setter.class, Kind.ConstantSetter);
+        temp.put(Constants.Substracter.class, Kind.ConstantSubtracter);
         temp.put(Lists.Appender.class, Kind.ListAppender);
-        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
         temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+        temp.put(Lists.DiscarderByIndex.class, Kind.ListDiscarderByIndex);
         temp.put(Lists.Prepender.class, Kind.ListPrepender);
-        temp.put(Maps.Putter.class, Kind.MapPutter);
         temp.put(Lists.Setter.class, Kind.ListSetter);
-        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(Lists.SetterByIndex.class, Kind.ListSetterByIndex);
+        temp.put(Maps.DiscarderByKey.class, Kind.MapDiscarderByKey);
+        temp.put(Maps.Putter.class, Kind.MapPutter);
         temp.put(Maps.Setter.class, Kind.MapSetter);
-        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
-        temp.put(Constants.Setter.class, Kind.ConstantSetter);
-        temp.put(Constants.Substracter.class, Kind.ConstantSubtracter);
         temp.put(Maps.SetterByKey.class, Kind.MapSetterByKey);
-        temp.put(Lists.SetterByIndex.class, Kind.ListSetterByIndex);
+        temp.put(Sets.Adder.class, Kind.SetAdder);
+        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+        temp.put(Sets.ElementDiscarder.class, Kind.SetElementDiscarder);
+        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
         temp.put(UserTypes.SetterByField.class, Kind.UserTypeSetterByField);
         return temp;
     }
@@ -103,7 +113,10 @@ public class TxnReferenceOperation
         ConstantSubtracter((byte) 13, (column, keyOrIndex, field, value) -> 
new Constants.Substracter(column, value)),
         MapSetterByKey((byte) 14, (column, keyOrIndex, field, value) -> new 
Maps.SetterByKey(column, keyOrIndex, value)),
         ListSetterByIndex((byte) 15, (column, keyOrIndex, field, value) -> new 
Lists.SetterByIndex(column, keyOrIndex, value)),
-        UserTypeSetterByField((byte) 16, (column, keyOrIndex, field, value) -> 
new UserTypes.SetterByField(column, field, value));
+        UserTypeSetterByField((byte) 16, (column, keyOrIndex, field, value) -> 
new UserTypes.SetterByField(column, field, value)),
+        ListDiscarderByIndex((byte) 17, (column, keyOrIndex, field, value) -> 
new Lists.DiscarderByIndex(column, value)),
+        MapDiscarderByKey((byte) 18, (column, keyOrIndex, field, value) -> new 
Maps.DiscarderByKey(column, value)),
+        SetElementDiscarder((byte) 19, (column, keyOrIndex, field, value) -> 
new Sets.ElementDiscarder(column, value));
 
         private final byte id;
         private final ToOperation toOperation;
@@ -154,18 +167,20 @@ public class TxnReferenceOperation
 
     private final Kind kind;
     private final ColumnMetadata receiver;
-    private final TableMetadata table;
-    private final ByteBuffer key;
-    private final ByteBuffer field;
+    public final TableMetadata table;
+    private final @Nullable ByteBuffer keyOrIndex;
+    private final @Nullable ByteBuffer field;
     private final TxnReferenceValue value;
+    private final @Nullable AbstractType<?> keyOrIndexType;
     private final AbstractType<?> valueType;
 
-    public TxnReferenceOperation(Kind kind, ColumnMetadata receiver, 
TableMetadata table, ByteBuffer key, ByteBuffer field, TxnReferenceValue value)
+    public TxnReferenceOperation(Kind kind, ColumnMetadata receiver, 
TableMetadata table,
+                                 @Nullable ByteBuffer keyOrIndex, @Nullable 
ByteBuffer field, TxnReferenceValue value)
     {
         this.kind = kind;
         this.receiver = receiver;
         this.table = table;
-        this.key = key;
+        this.keyOrIndex = keyOrIndex;
         this.field = field;
 
         // We don't expect operators on clustering keys, but unwrap just in 
case.
@@ -175,20 +190,36 @@ public class TxnReferenceOperation
         {
             // The value for a map subtraction is actually a set (see 
Operation.Substraction)
             this.valueType = SetType.getInstance(((MapType<?, ?>) 
receiverType).getKeysType(), true);
+            this.keyOrIndexType = null;
+        }
+        else if (kind == Kind.MapDiscarderByKey || kind == 
Kind.SetElementDiscarder)
+        {
+            CollectionType<?> ct = (CollectionType<?>) receiverType;
+            this.keyOrIndexType = null;
+            this.valueType = ct.nameComparator();
         }
         else if (kind == Kind.MapSetterByKey || kind == Kind.ListSetterByIndex)
         {
-            this.valueType = ((CollectionType<?>) 
receiverType).valueComparator();
+            CollectionType<?> ct = (CollectionType<?>) receiverType;
+            this.keyOrIndexType = ct.nameComparator();
+            this.valueType = ct.valueComparator();
+        }
+        else if (kind == Kind.ListDiscarderByIndex)
+        {
+            this.valueType = Int32Type.instance;
+            this.keyOrIndexType = null;
         }
         else if (kind == Kind.UserTypeSetterByField)
         {
             UserType userType = (UserType) receiverType;
             CellPath fieldPath = userType.cellPathForField(new 
FieldIdentifier(field));
             this.valueType = userType.fieldType(fieldPath);
+            this.keyOrIndexType = null;
         }
         else
         {
             this.valueType = receiverType;
+            this.keyOrIndexType = null;
         }
 
         this.value = value;
@@ -202,27 +233,35 @@ public class TxnReferenceOperation
         TxnReferenceOperation that = (TxnReferenceOperation) o;
         return Objects.equals(receiver, that.receiver) 
                && kind == that.kind
-               && Objects.equals(key, that.key)
+               && Objects.equals(keyOrIndex, that.keyOrIndex)
                && Objects.equals(field, that.field)
                && Objects.equals(value, that.value);
     }
 
-    public void collect(TableMetadatas.Collector collector)
-    {
-        collector.add(table);
-        value.collect(collector);
-    }
-
     @Override
     public int hashCode()
     {
-        return Objects.hash(receiver, kind, key, field, value);
+        return Objects.hash(receiver, kind, keyOrIndex, field, value);
     }
 
+
     @Override
     public String toString()
     {
-        return receiver + " = " + value;
+        return "TxnReferenceOperation{" +
+               "kind=" + kind +
+               ", receiver=" + receiver +
+               ", table=" + table +
+               ", key=" + keyOrIndex +
+               ", field=" + field +
+               ", value=" + value +
+               '}';
+    }
+
+    public void collect(TableMetadatas.Collector collector)
+    {
+        collector.add(table);
+        value.collect(collector);
     }
 
     public ColumnMetadata receiver()
@@ -236,11 +275,12 @@ public class TxnReferenceOperation
         operation.execute(key, up);
     }
 
-    private Operation toOperation(TxnData data)
+    @VisibleForTesting
+    Operation toOperation(TxnData data)
     {
         FieldIdentifier fieldIdentifier = field == null ? null : new 
FieldIdentifier(field);
         Term valueTerm = toTerm(data, valueType);
-        Term keyorIndexTerm = key == null ? null : toTerm(key, valueType);
+        Term keyorIndexTerm = keyOrIndex == null ? null : toTerm(keyOrIndex, 
keyOrIndexType);
         return kind.toOperation(receiver, keyorIndexTerm, fieldIdentifier, 
valueTerm);
     }
 
@@ -274,9 +314,9 @@ public class TxnReferenceOperation
             columnMetadataSerializer.serialize(operation.receiver, 
operation.table, out);
             TxnReferenceValue.serializer.serialize(operation.value, tables, 
out);
 
-            out.writeBoolean(operation.key != null);
-            if (operation.key != null)
-                ByteBufferUtil.writeWithVIntLength(operation.key, out);
+            out.writeBoolean(operation.keyOrIndex != null);
+            if (operation.keyOrIndex != null)
+                ByteBufferUtil.writeWithVIntLength(operation.keyOrIndex, out);
 
             out.writeBoolean(operation.field != null);
             if (operation.field != null)
@@ -303,9 +343,11 @@ public class TxnReferenceOperation
             size += 
columnMetadataSerializer.serializedSize(operation.receiver, operation.table);
             size += 
TxnReferenceValue.serializer.serializedSize(operation.value, tables);
 
-            if (operation.key != null)
-                size += 
ByteBufferUtil.serializedSizeWithVIntLength(operation.key);
+            size += TypeSizes.sizeof(operation.keyOrIndex != null);
+            if (operation.keyOrIndex != null)
+                size += 
ByteBufferUtil.serializedSizeWithVIntLength(operation.keyOrIndex);
 
+            size += TypeSizes.sizeof(operation.field != null);
             if (operation.field != null)
                 size += 
ByteBufferUtil.serializedSizeWithVIntLength(operation.field);
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
index 2b5af7c08b..b3bf2b3fc8 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
@@ -26,7 +26,8 @@ public interface TxnResult extends Result
     {
         txn_data(0),
         retry_new_protocol(1),
-        range_read(2);
+        range_read(2),
+        validation_rejection(3);
 
         int id;
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index d4b549a61d..0d3def4fd0 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -45,6 +45,7 @@ import accord.utils.SortedArrays;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
 import org.apache.cassandra.io.UnversionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -85,7 +86,7 @@ import static 
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
 import static 
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
-public class TxnUpdate extends AccordUpdate
+public final class TxnUpdate extends AccordUpdate
 {
     static class ConditionalBlock
     {
@@ -547,6 +548,9 @@ public class TxnUpdate extends AccordUpdate
     // Memoize computation of condition
     private Boolean anyConditionResult;
 
+    @Nullable
+    private transient volatile Object validationException = this;
+
     public TxnUpdate(TableMetadatas tables, List<Fragment> fragments, 
TxnCondition condition, @Nullable ConsistencyLevel cassandraCommitCL, 
PreserveTimestamp preserveTimestamps)
     {
         requireArgument(cassandraCommitCL == null || 
IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(cassandraCommitCL));
@@ -584,6 +588,25 @@ public class TxnUpdate extends AccordUpdate
         return new TxnUpdate(TableMetadatas.none(), Keys.EMPTY, 
Collections.emptyList(), null, PreserveTimestamp.no);
     }
 
+    public static TxnValidationRejection validationRejection(@Nullable Update 
update)
+    {
+        if (update != null && update.getClass() == TxnUpdate.class)
+        {
+            RequestValidationException e = ((TxnUpdate) 
update).validationRejection();
+            if (e != null) return new TxnValidationRejection(e);
+        }
+        return null;
+    }
+
+    @Nullable
+    private RequestValidationException validationRejection()
+    {
+        Object snapshot = validationException;
+        if (snapshot == this)
+            throw Invariants.illegalState("Attempted to check for validation 
exception before .apply was called");
+        return snapshot == null ? null : (RequestValidationException) snapshot;
+    }
+
     @Override
     public long estimatedSizeOnHeap()
     {
@@ -677,7 +700,18 @@ public class TxnUpdate extends AccordUpdate
         ClusterMetadata cm = ClusterMetadata.current();
         checkState(cm.epoch.getEpoch() >= executeAt.epoch(), "TCM epoch %d is 
< executeAt epoch %d", cm.epoch.getEpoch(), executeAt.epoch());
 
-        Pair<List<TxnWrite.Update>, SimpleBitSet> pair = 
processCondition(executeAt, data);
+        Pair<List<TxnWrite.Update>, SimpleBitSet> pair = null;
+        try
+        {
+            validationException = null;
+            pair = processCondition(executeAt, data);
+        }
+        catch (RequestValidationException e)
+        {
+            // the update isn't allowed
+            validationException = e;
+        }
+
         if (pair == null)
             return new TxnWrite(TableMetadatas.none(), 
Collections.emptyList(), SimpleBitSets.allUnset(numConditionalBlocks()));
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnValidationRejection.java
similarity index 51%
copy from src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
copy to 
src/java/org/apache/cassandra/service/accord/txn/TxnValidationRejection.java
index 2b5af7c08b..aca6fbfda3 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnResult.java
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/TxnValidationRejection.java
@@ -18,25 +18,36 @@
 
 package org.apache.cassandra.service.accord.txn;
 
-import accord.api.Result;
+import java.util.Objects;
 
-public interface TxnResult extends Result
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+import static 
org.apache.cassandra.service.accord.txn.TxnResult.Kind.validation_rejection;
+
+public class TxnValidationRejection implements TxnResult
 {
-    enum Kind
-    {
-        txn_data(0),
-        retry_new_protocol(1),
-        range_read(2);
+    private final RequestValidationException validationException;
 
-        int id;
+    public TxnValidationRejection(RequestValidationException 
validationException)
+    {
+        this.validationException = Objects.requireNonNull(validationException);
+    }
 
-        Kind(int id)
-        {
-            this.id = id;
-        }
+    public static void maybeThrow(TxnResult txnResult)
+    {
+        if (txnResult.kind() == validation_rejection)
+            throw ((TxnValidationRejection) txnResult).validationException;
     }
 
-    Kind kind();
+    @Override
+    public Kind kind()
+    {
+        return Kind.validation_rejection;
+    }
 
-    long estimatedSizeOnHeap();
+    @Override
+    public long estimatedSizeOnHeap()
+    {
+        return 0;
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index bdee5765ac..f3c7fceede 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -3416,4 +3416,20 @@ public abstract class AccordCQLTestBase extends 
AccordTestBase
             }
         });
     }
+
+    @Test
+    public void userSeesInvalidRejection() throws Exception
+    {
+        var expectedType = 
AssertionUtils.isInstanceof(InvalidRequestException.class);
+        test("CREATE TABLE " + qualifiedAccordTableName + "(k int PRIMARY KEY, 
l list<int>) WITH " + transactionalMode.asCqlParam(), cluster -> {
+            String cql = "UPDATE " + qualifiedAccordTableName + " SET l[0] = 
42 WHERE k=42";
+            Assertions.assertThatThrownBy(() -> 
cluster.coordinator(1).execute(cql, QUORUM))
+                      .is(expectedType)
+                      .hasMessage("Attempted to set an element on a list which 
is null");
+
+            Assertions.assertThatThrownBy(() -> 
cluster.coordinator(1).execute(wrapInTxn(cql), QUORUM))
+                      .is(expectedType)
+                      .hasMessage("Attempted to set an element on a list which 
is null");
+        });
+    }
 }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java 
b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
index 383109573c..1c9c361c4b 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
@@ -348,7 +348,7 @@ public class BytesPartitionState
             sb.append(partition);
             sb.append(", clustering=");
             if (clustering == null) sb.append("null");
-            else                    appendValues(sb, 
factory.clusteringColumns, clustering);
+            else                    appendValues(sb, clustering == 
Clustering.STATIC_CLUSTERING ? List.of() : factory.clusteringColumns, 
clustering);
             sb.append(')');
             return sb.toString();
         }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java 
b/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
index 57ec2ad4af..6c426e8fcc 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
@@ -410,6 +410,12 @@ public class PartitionState implements 
Iterable<PartitionState.RowState>
                        ", lts(" + StringUtils.toString(lts) + ")";
             }
         }
+
+        @Override
+        public String toString()
+        {
+            return toString(partitionState.valueGenerators);
+        }
     }
 
     public static long[] arr(int length, long fill)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/CollectionAccess.java 
b/test/unit/org/apache/cassandra/cql3/ast/CollectionAccess.java
index d43d7bf34f..fca80da516 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/CollectionAccess.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/CollectionAccess.java
@@ -35,6 +35,18 @@ public class CollectionAccess implements ReferenceExpression
         this.type = type;
     }
 
+    @Override
+    public ReferenceExpression visit(Visitor v)
+    {
+        var u = v.visit(this);
+        if (u != this) return u;
+        var col = column.visit(v);
+        var val = element.visit(v);
+        if (col != column || val != element)
+            return new CollectionAccess(col, val, type);
+        return this;
+    }
+
     @Override
     public void toCQL(StringBuilder sb, CQLFormatter formatter)
     {
diff --git 
a/test/unit/org/apache/cassandra/service/accord/txn/TxnReferenceOperationTest.java
 
b/test/unit/org/apache/cassandra/service/accord/txn/TxnReferenceOperationTest.java
new file mode 100644
index 0000000000..5515221815
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/accord/txn/TxnReferenceOperationTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import accord.utils.Gen;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.terms.Constants;
+import org.apache.cassandra.cql3.terms.UserTypes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.Serializers;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.serializers.TableMetadatas;
+import org.apache.cassandra.utils.AbstractTypeGenerators;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Generators;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
+import org.reflections.util.ConfigurationBuilder;
+
+import static accord.utils.Property.qt;
+
+public class TxnReferenceOperationTest
+{
+    private static final String KS = "ks";
+    private static final TxnData EMPTY = new TxnData();
+
+    @Test
+    public void coverage()
+    {
+        Reflections reflections = new Reflections(new ConfigurationBuilder()
+                                                  
.forPackage("org.apache.cassandra")
+                                                  
.setScanners(Scanners.SubTypes)
+                                                  .setExpandSuperTypes(true)
+                                                  .setParallel(true));
+        var subTypes = reflections.getSubTypesOf(Operation.class);
+        var knownTypes = TxnReferenceOperation.initOperationKindMap().keySet();
+        // these types do not have a way to define a reference, as they are 
column level:
+        Set<Class<? extends Operation>> safeToExclude = 
ImmutableSet.of(Constants.Deleter.class,        // DELETE foo
+                                                                        
UserTypes.DeleterByField.class // DELETE foo.bar
+        );
+
+        StringBuilder sb = null;
+        for (var klass : Sets.difference(subTypes, Sets.union(knownTypes, 
safeToExclude)))
+        {
+            if (sb == null)
+                sb = new StringBuilder();
+            sb.append(klass.getCanonicalName()).append('\n');
+        }
+        if (sb != null)
+            throw new AssertionError(sb.toString());
+    }
+
+    @Test
+    public void serde()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().withExamples(10_000).forAll(gen()).check(txnOp -> {
+            Serializers.testSerde(output, TxnReferenceOperation.serializer, 
txnOp, TableMetadatas.of(txnOp.table));
+
+            txnOp.toOperation(EMPTY);
+        });
+    }
+
+    private enum Group
+    {
+        Setter, SetterByIndex, SetterByKey, SetterByField,
+        Adder, Subtracter,
+        Appender, Putter, Discarder, Prepender,
+    }
+
+    private static Gen<TxnReferenceValue> valueGen(AbstractType<?> type)
+    {
+        return Generators.toGen(AbstractTypeGenerators.getTypeSupport(type)
+                                                      .bytesGen())
+                         .map(TxnReferenceValue.Constant::new);
+    }
+
+    private static Gen<TxnReferenceOperation> gen()
+    {
+        return rs -> {
+            TxnReferenceOperation.Kind kind;
+            ColumnMetadata receiver;
+            TableMetadata table;
+            @Nullable ByteBuffer keyOrIndex = null;
+            @Nullable ByteBuffer field = null;
+            TxnReferenceValue value;
+            Group group = rs.pick(Group.values());
+            switch (group)
+            {
+                case Prepender:
+                {
+                    kind = TxnReferenceOperation.Kind.ListPrepender;
+                    ListType<?> type = 
ListType.getInstance(Int32Type.instance, true);
+                    value = valueGen(type).next(rs);
+
+                    table = table(type);
+                    receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                }
+                break;
+                case Discarder:
+                {
+                    CollectionType<?> type = (CollectionType<?>) 
Generators.toGen(AbstractTypeGenerators.builder()
+                                                                               
                         .withMaxDepth(1)
+                                                                               
                         .withTypeKinds(AbstractTypeGenerators.TypeKind.LIST,
+                                                                               
                                        AbstractTypeGenerators.TypeKind.SET,
+                                                                               
                                        AbstractTypeGenerators.TypeKind.MAP)
+                                                                               
                         .build())
+                                                                           
.next(rs);
+                    kind = type instanceof ListType
+                           ? TxnReferenceOperation.Kind.ListDiscarder
+                           : TxnReferenceOperation.Kind.SetDiscarder;
+                    table = table(type);
+                    receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                    if (kind == TxnReferenceOperation.Kind.ListDiscarder && 
rs.nextBoolean())
+                    {
+                        kind = TxnReferenceOperation.Kind.ListDiscarderByIndex;
+                        value = new 
TxnReferenceValue.Constant(Int32Type.instance.decompose(42));
+                    }
+                    else if (type instanceof MapType && rs.nextBoolean())
+                    {
+                        kind = TxnReferenceOperation.Kind.MapDiscarderByKey;
+                        var keyType = ((MapType<?, ?>) type).getKeysType();
+                        value = valueGen(keyType).next(rs);
+                    }
+                    else if (type instanceof SetType && rs.nextBoolean())
+                    {
+                        kind = TxnReferenceOperation.Kind.SetElementDiscarder;
+                        var elementType = ((SetType<?>) 
type).getElementsType();
+                        value = valueGen(elementType).next(rs);
+                    }
+                    else
+                    {
+                        CollectionType<?> discardType = type instanceof MapType
+                                                        ? 
SetType.getInstance(((MapType<?, ?>) type).getKeysType(), true)
+                                                        : type;
+                        value = valueGen(discardType).next(rs);
+                    }
+                }
+                break;
+                case Adder:
+                case Subtracter:
+                {
+                    if (group == Group.Adder && rs.nextBoolean())
+                    {
+                        // this is similar to ListAppend
+                        var type = SetType.getInstance(Int32Type.instance, 
true);
+                        table = table(type);
+                        receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                        value = valueGen(type).next(rs);
+                        kind = TxnReferenceOperation.Kind.SetAdder;
+                    }
+                    else
+                    {
+                        var type = Int32Type.instance;
+                        table = table(type);
+                        receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                        value = valueGen(type).next(rs);
+                        kind = group == Group.Adder ? 
TxnReferenceOperation.Kind.ConstantAdder : 
TxnReferenceOperation.Kind.ConstantSubtracter;
+                    }
+                }
+                break;
+                case Setter:
+                {
+                    var type = 
Generators.toGen(AbstractTypeGenerators.builder()
+                                                                      
.withoutUnsafeEquality()
+                                                                      
.withMaxDepth(1)
+                                                                      .build())
+                               .next(rs);
+                    table = table(type);
+                    receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                    value = valueGen(type).next(rs);
+                    if (type instanceof ListType)
+                        kind = TxnReferenceOperation.Kind.ListSetter;
+                    else if (type instanceof SetType)
+                        kind = TxnReferenceOperation.Kind.SetSetter;
+                    else if (type instanceof MapType)
+                        kind = TxnReferenceOperation.Kind.MapSetter;
+                    else if (type instanceof UserType)
+                        kind = TxnReferenceOperation.Kind.UserTypeSetter;
+                    else
+                        kind = TxnReferenceOperation.Kind.ConstantSetter;
+                }
+                break;
+                case SetterByIndex:
+                {
+                    ListType<String> type = 
ListType.getInstance(UTF8Type.instance, true);
+                    table = table(type);
+                    receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                    value = valueGen(type.getElementsType()).next(rs);
+                    kind = TxnReferenceOperation.Kind.ListSetterByIndex;// 
x[?] = ?
+                    keyOrIndex = Int32Type.instance.decompose(42);
+                }
+                break;
+                case Appender:
+                {
+                    ListType<String> type = 
ListType.getInstance(UTF8Type.instance, true);
+                    table = table(type);
+                    receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                    value = valueGen(type).next(rs);
+                    kind = TxnReferenceOperation.Kind.ListAppender;
+                }
+                break;
+                case SetterByKey:
+                {
+                    MapType<Integer, String> type = 
MapType.getInstance(Int32Type.instance, UTF8Type.instance, true);
+                    table = table(type);
+                    receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                    value = valueGen(type.getValuesType()).next(rs);
+                    kind = TxnReferenceOperation.Kind.MapSetterByKey;
+                    keyOrIndex = Int32Type.instance.decompose(42);
+                }
+                break;
+                case Putter: // x += {foo: bar, baz: biz} -- basically 
Appender but for Map!
+                {
+                    MapType<Integer, String> type = 
MapType.getInstance(Int32Type.instance, UTF8Type.instance, true);
+                    table = table(type);
+                    receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                    value = valueGen(type).next(rs);
+                    kind = TxnReferenceOperation.Kind.MapPutter;
+                }
+                break;
+                case SetterByField:
+                {
+                    UserType type = new UserType(KS, 
ByteBufferUtil.bytes("udt"),
+                                                 
List.of(FieldIdentifier.forUnquoted("f1")),
+                                                 List.of(UTF8Type.instance),
+                                                 true);
+                    kind = TxnReferenceOperation.Kind.UserTypeSetterByField;
+                    table = table(type);
+                    receiver = 
table.getColumn(ColumnIdentifier.getInterned("col", true));
+                    value = valueGen(UTF8Type.instance).next(rs);
+                    field = FieldIdentifier.forUnquoted("f1").bytes;
+                }
+                break;
+
+                default:
+                    throw new UnsupportedOperationException();
+            }
+            return new TxnReferenceOperation(kind, receiver, table, 
keyOrIndex, field, value);
+        };
+    }
+
+    private static TableMetadata table(AbstractType<?> type)
+    {
+        return TableMetadata.builder(KS, "tbl")
+               .partitioner(Murmur3Partitioner.instance)
+               .addPartitionKeyColumn("pk", Int32Type.instance)
+               .addRegularColumn("col", type)
+               .build();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to