This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95522f8  Add guardrail for list operations that require read before 
write
95522f8 is described below

commit 95522f85d5e14734f9af3096953974a4f48a884f
Author: Andrés de la Peña <[email protected]>
AuthorDate: Wed Dec 15 16:31:24 2021 +0000

    Add guardrail for list operations that require read before write
    
    patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova for 
CASSANDRA-17154
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   3 +
 .../apache/cassandra/config/GuardrailsOptions.java |  15 ++
 src/java/org/apache/cassandra/cql3/Lists.java      |  11 ++
 .../apache/cassandra/cql3/UpdateParameters.java    |   4 +
 .../cassandra/cql3/statements/BatchStatement.java  |  12 +-
 .../cassandra/cql3/statements/CQL3CasRequest.java  |  13 +-
 .../cql3/statements/ModificationStatement.java     |  34 ++--
 .../apache/cassandra/db/guardrails/Guardrails.java |  18 +++
 .../cassandra/db/guardrails/GuardrailsConfig.java  |   7 +
 .../cassandra/db/guardrails/GuardrailsMBean.java   |  14 ++
 .../cassandra/io/sstable/CQLSSTableWriter.java     |   2 +
 .../org/apache/cassandra/service/CASRequest.java   |   2 +-
 .../org/apache/cassandra/service/StorageProxy.java |   2 +-
 .../test/microbench/BatchStatementBench.java       |   3 +-
 test/unit/org/apache/cassandra/cql3/ListsTest.java |  11 +-
 ...GuardrailReadBeforeWriteListOperationsTest.java | 177 +++++++++++++++++++++
 .../io/sstable/StressCQLSSTableWriter.java         |   1 +
 18 files changed, 303 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 99c38b8..771302e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add guardrail for list operations that require read before write 
(CASSANDRA-17154)
  * Migrate thresholds for number of keyspaces and tables to guardrails 
(CASSANDRA-17195)
  * Remove self-reference in SSTableTidier (CASSANDRA-17205)
  * Add guardrail for query page size (CASSANDRA-17189)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 541aa6e..b0ab525 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1618,3 +1618,6 @@ enable_drop_compact_storage: false
 #     page_size:
 #       warn_threshold: -1
 #       abort_threshold: -1
+# Guardrail to allow/disallow list operations that require read before write, 
i.e. setting list element by index and
+# removing list elements by either index or value. Defaults to true.
+#     read_before_write_list_operations_enabled: true
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java 
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index e019b84..ff42f59 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -66,6 +66,7 @@ public class GuardrailsOptions implements GuardrailsConfig
     public final IntThreshold page_size = new IntThreshold();
 
     public volatile boolean user_timestamps_enabled = true;
+    public volatile boolean read_before_write_list_operations_enabled = true;
 
     public void validate()
     {
@@ -150,6 +151,20 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> user_timestamps_enabled = x);
     }
 
+    @Override
+    public boolean getReadBeforeWriteListOperationsEnabled()
+    {
+        return read_before_write_list_operations_enabled;
+    }
+
+    public void setReadBeforeWriteListOperationsEnabled(boolean enabled)
+    {
+        updatePropertyWithLogging(NAME_PREFIX + 
"read_before_write_list_operations_enabled",
+                                  enabled,
+                                  () -> 
read_before_write_list_operations_enabled,
+                                  x -> 
read_before_write_list_operations_enabled = x);
+    }
+
     private static <T> void updatePropertyWithLogging(String propertyName, T 
newValue, Supplier<T> getter, Consumer<T> setter)
     {
         T oldValue = getter.get();
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java 
b/src/java/org/apache/cassandra/cql3/Lists.java
index a2c2608..cc14e76 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.schema.ColumnMetadata;
 import com.google.common.annotations.VisibleForTesting;
@@ -448,6 +449,9 @@ public abstract class Lists
             // we should not get here for frozen lists
             assert column.type.isMultiCell() : "Attempted to set an individual 
element on a frozen list";
 
+            Guardrails.readBeforeWriteListOperationsEnabled
+            .ensureEnabled("Setting of list items by index requiring read 
before write", params.clientState);
+
             ByteBuffer index = idx.bindAndGet(params.options);
             ByteBuffer value = t.bindAndGet(params.options);
 
@@ -565,6 +569,9 @@ public abstract class Lists
         {
             assert column.type.isMultiCell() : "Attempted to delete from a 
frozen list";
 
+            Guardrails.readBeforeWriteListOperationsEnabled
+            .ensureEnabled("Removal of list items requiring read before 
write", params.clientState);
+
             // We want to call bind before possibly returning to reject 
queries where the value provided is not a list.
             Term.Terminal value = t.bind(params.options);
 
@@ -602,6 +609,10 @@ public abstract class Lists
         public void execute(DecoratedKey partitionKey, UpdateParameters 
params) throws InvalidRequestException
         {
             assert column.type.isMultiCell() : "Attempted to delete an item by 
index from a frozen list";
+
+            Guardrails.readBeforeWriteListOperationsEnabled
+            .ensureEnabled("Removal of list items by index requiring read 
before write", params.clientState);
+
             Term.Terminal index = t.bind(params.options);
             if (index == null)
                 throw new InvalidRequestException("Invalid null value for list 
index");
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java 
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 4272307..cbf9d51 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * Groups the parameters of an update query, and make building updates easier.
@@ -35,6 +36,7 @@ public class UpdateParameters
 {
     public final TableMetadata metadata;
     public final RegularAndStaticColumns updatedColumns;
+    public final ClientState clientState;
     public final QueryOptions options;
 
     private final int nowInSec;
@@ -54,6 +56,7 @@ public class UpdateParameters
 
     public UpdateParameters(TableMetadata metadata,
                             RegularAndStaticColumns updatedColumns,
+                            ClientState clientState,
                             QueryOptions options,
                             long timestamp,
                             int nowInSec,
@@ -63,6 +66,7 @@ public class UpdateParameters
     {
         this.metadata = metadata;
         this.updatedColumns = updatedColumns;
+        this.clientState = clientState;
         this.options = options;
 
         this.nowInSec = nowInSec;
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 24877ef..dabe05e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -265,7 +265,8 @@ public class BatchStatement implements CQLStatement
     }
 
     @VisibleForTesting
-    public List<? extends IMutation> getMutations(BatchQueryOptions options,
+    public List<? extends IMutation> getMutations(ClientState state,
+                                                  BatchQueryOptions options,
                                                   boolean local,
                                                   long batchTimestamp,
                                                   int nowInSeconds,
@@ -306,7 +307,7 @@ public class BatchStatement implements CQLStatement
             }
             QueryOptions statementOptions = options.forStatement(i);
             long timestamp = attrs.getTimestamp(batchTimestamp, 
statementOptions);
-            statement.addUpdates(collector, partitionKeys.get(i), 
statementOptions, local, timestamp, nowInSeconds, queryStartNanoTime);
+            statement.addUpdates(collector, partitionKeys.get(i), state, 
statementOptions, local, timestamp, nowInSeconds, queryStartNanoTime);
         }
 
         if (tablesWithZeroGcGs != null)
@@ -415,7 +416,8 @@ public class BatchStatement implements CQLStatement
         if (updatesVirtualTables)
             executeInternalWithoutCondition(queryState, options, 
queryStartNanoTime);
         else    
-            executeWithoutConditions(getMutations(options, false, timestamp, 
nowInSeconds, queryStartNanoTime), options.getConsistency(), 
queryStartNanoTime);
+            executeWithoutConditions(getMutations(queryState.getClientState(), 
options, false, timestamp, nowInSeconds, queryStartNanoTime),
+                                     options.getConsistency(), 
queryStartNanoTime);
 
         return new ResultMessage.Void();
     }
@@ -560,7 +562,7 @@ public class BatchStatement implements CQLStatement
         long timestamp = batchOptions.getTimestamp(queryState);
         int nowInSeconds = batchOptions.getNowInSeconds(queryState);
 
-        for (IMutation mutation : getMutations(batchOptions, true, timestamp, 
nowInSeconds, queryStartNanoTime))
+        for (IMutation mutation : getMutations(queryState.getClientState(), 
batchOptions, true, timestamp, nowInSeconds, queryStartNanoTime))
             mutation.apply();
         return null;
     }
@@ -577,7 +579,7 @@ public class BatchStatement implements CQLStatement
         long timestamp = options.getTimestamp(state);
         int nowInSeconds = options.getNowInSeconds(state);
 
-        try (RowIterator result = ModificationStatement.casInternal(request, 
timestamp, nowInSeconds))
+        try (RowIterator result = 
ModificationStatement.casInternal(state.getClientState(), request, timestamp, 
nowInSeconds))
         {
             ResultSet resultSet =
                 ModificationStatement.buildCasResultSet(ksName,
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 563a639..cc02650 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.CASRequest;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.Pair;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
@@ -228,13 +229,13 @@ public class CQL3CasRequest implements CASRequest
         return builder.build();
     }
 
-    public PartitionUpdate makeUpdates(FilteredPartition current) throws 
InvalidRequestException
+    public PartitionUpdate makeUpdates(FilteredPartition current, ClientState 
state) throws InvalidRequestException
     {
         PartitionUpdate.Builder updateBuilder = new 
PartitionUpdate.Builder(metadata, key, updatedColumns(), conditions.size());
         for (RowUpdate upd : updates)
-            upd.applyUpdates(current, updateBuilder);
+            upd.applyUpdates(current, updateBuilder, state);
         for (RangeDeletion upd : rangeDeletions)
-            upd.applyUpdates(current, updateBuilder);
+            upd.applyUpdates(current, updateBuilder, state);
 
         PartitionUpdate partitionUpdate = updateBuilder.build();
         IndexRegistry.obtain(metadata).validate(partitionUpdate);
@@ -265,12 +266,13 @@ public class CQL3CasRequest implements CASRequest
             this.nowInSeconds = nowInSeconds;
         }
 
-        void applyUpdates(FilteredPartition current, PartitionUpdate.Builder 
updateBuilder)
+        void applyUpdates(FilteredPartition current, PartitionUpdate.Builder 
updateBuilder, ClientState state)
         {
             Map<DecoratedKey, Partition> map = stmt.requiresRead() ? 
Collections.singletonMap(key, current) : null;
             UpdateParameters params =
                 new UpdateParameters(metadata,
                                      updateBuilder.columns(),
+                                     state,
                                      options,
                                      timestamp,
                                      nowInSeconds,
@@ -297,13 +299,14 @@ public class CQL3CasRequest implements CASRequest
             this.nowInSeconds = nowInSeconds;
         }
 
-        void applyUpdates(FilteredPartition current, PartitionUpdate.Builder 
updateBuilder)
+        void applyUpdates(FilteredPartition current, PartitionUpdate.Builder 
updateBuilder, ClientState state)
         {
             // No slice statements currently require a read, but this 
maintains consistency with RowUpdate, and future proofs us
             Map<DecoratedKey, Partition> map = stmt.requiresRead() ? 
Collections.singletonMap(key, current) : null;
             UpdateParameters params =
                 new UpdateParameters(metadata,
                                      updateBuilder.columns(),
+                                     state,
                                      options,
                                      timestamp,
                                      nowInSeconds,
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 0a35e1f..eed528f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -476,7 +476,8 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
             cl.validateForWrite();
 
         List<? extends IMutation> mutations =
-            getMutations(options,
+            getMutations(queryState.getClientState(),
+                         options,
                          false,
                          options.getTimestamp(queryState),
                          options.getNowInSeconds(queryState),
@@ -639,7 +640,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
     {
         long timestamp = options.getTimestamp(queryState);
         int nowInSeconds = options.getNowInSeconds(queryState);
-        for (IMutation mutation : getMutations(options, true, timestamp, 
nowInSeconds, queryStartNanoTime))
+        for (IMutation mutation : getMutations(queryState.getClientState(), 
options, true, timestamp, nowInSeconds, queryStartNanoTime))
             mutation.apply();
         return null;
     }
@@ -648,13 +649,13 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
     {
         CQL3CasRequest request = makeCasRequest(state, options);
 
-        try (RowIterator result = casInternal(request, 
options.getTimestamp(state), options.getNowInSeconds(state)))
+        try (RowIterator result = casInternal(state.getClientState(), request, 
options.getTimestamp(state), options.getNowInSeconds(state)))
         {
             return new ResultMessage.Rows(buildCasResultSet(result, state, 
options));
         }
     }
 
-    static RowIterator casInternal(CQL3CasRequest request, long timestamp, int 
nowInSeconds)
+    static RowIterator casInternal(ClientState state, CQL3CasRequest request, 
long timestamp, int nowInSeconds)
     {
         UUID ballot = UUIDGen.getTimeUUIDFromMicros(timestamp);
 
@@ -669,7 +670,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         if (!request.appliesTo(current))
             return current.rowIterator();
 
-        PartitionUpdate updates = request.makeUpdates(current);
+        PartitionUpdate updates = request.makeUpdates(current, state);
         updates = TriggerExecutor.instance.execute(updates);
 
         Commit proposal = Commit.newProposal(ballot, updates);
@@ -680,27 +681,30 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
     /**
      * Convert statement into a list of mutations to apply on the server
      *
+     * @param state the client state
      * @param options value for prepared statement markers
      * @param local if true, any requests (for collections) performed by 
getMutation should be done locally only.
      * @param timestamp the current timestamp in microseconds to use if no 
timestamp is user provided.
      *
      * @return list of the mutations
      */
-    private List<? extends IMutation> getMutations(QueryOptions options,
-                                                         boolean local,
-                                                         long timestamp,
-                                                         int nowInSeconds,
-                                                         long 
queryStartNanoTime)
+    private List<? extends IMutation> getMutations(ClientState state,
+                                                   QueryOptions options,
+                                                   boolean local,
+                                                   long timestamp,
+                                                   int nowInSeconds,
+                                                   long queryStartNanoTime)
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
         HashMultiset<ByteBuffer> perPartitionKeyCounts = 
HashMultiset.create(keys);
         SingleTableUpdatesCollector collector = new 
SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
-        addUpdates(collector, keys, options, local, timestamp, nowInSeconds, 
queryStartNanoTime);
+        addUpdates(collector, keys, state, options, local, timestamp, 
nowInSeconds, queryStartNanoTime);
         return collector.toMutations();
     }
 
     final void addUpdates(UpdatesCollector collector,
                           List<ByteBuffer> keys,
+                          ClientState state,
                           QueryOptions options,
                           boolean local,
                           long timestamp,
@@ -717,6 +721,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
             UpdateParameters params = makeUpdateParameters(keys,
                                                            new 
ClusteringIndexSliceFilter(slices, false),
+                                                           state,
                                                            options,
                                                            DataLimits.NONE,
                                                            local,
@@ -742,7 +747,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
             if (restrictions.hasClusteringColumnsRestrictions() && 
clusterings.isEmpty())
                 return;
 
-            UpdateParameters params = makeUpdateParameters(keys, clusterings, 
options, local, timestamp, nowInSeconds, queryStartNanoTime);
+            UpdateParameters params = makeUpdateParameters(keys, clusterings, 
state, options, local, timestamp, nowInSeconds, queryStartNanoTime);
 
             for (ByteBuffer key : keys)
             {
@@ -789,6 +794,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
     private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
                                                   NavigableSet<Clustering<?>> 
clusterings,
+                                                  ClientState state,
                                                   QueryOptions options,
                                                   boolean local,
                                                   long timestamp,
@@ -798,6 +804,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         if (clusterings.contains(Clustering.STATIC_CLUSTERING))
             return makeUpdateParameters(keys,
                                         new 
ClusteringIndexSliceFilter(Slices.ALL, false),
+                                        state,
                                         options,
                                         DataLimits.cqlLimits(1),
                                         local,
@@ -807,6 +814,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
         return makeUpdateParameters(keys,
                                     new 
ClusteringIndexNamesFilter(clusterings, false),
+                                    state,
                                     options,
                                     DataLimits.NONE,
                                     local,
@@ -817,6 +825,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
     private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
                                                   ClusteringIndexFilter filter,
+                                                  ClientState state,
                                                   QueryOptions options,
                                                   DataLimits limits,
                                                   boolean local,
@@ -836,6 +845,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
         return new UpdateParameters(metadata(),
                                     updatedColumns(),
+                                    state,
                                     options,
                                     getTimestamp(timestamp, options),
                                     nowInSeconds,
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index d29ab34..00f3492 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -127,6 +127,13 @@ public final class Guardrails implements GuardrailsMBean
                             : format("Aborting query for table %s, page size 
%s exceeds abort threshold of %s.",
                                      what, value, threshold));
 
+    /**
+     * Guardrail disabling operations on lists that require read before write.
+     */
+    public static final DisableFlag readBeforeWriteListOperationsEnabled =
+    new DisableFlag(state -> 
!CONFIG_PROVIDER.getOrCreate(state).getReadBeforeWriteListOperationsEnabled(),
+                    "List operation requiring read before write");
+
     private Guardrails()
     {
         MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
@@ -333,6 +340,17 @@ public final class Guardrails implements GuardrailsMBean
         DEFAULT_CONFIG.getPageSize().setThresholds(warn, abort);
     }
 
+    public boolean getReadBeforeWriteListOperationsEnabled()
+    {
+        return DEFAULT_CONFIG.getReadBeforeWriteListOperationsEnabled();
+    }
+
+    @Override
+    public void setReadBeforeWriteListOperationsEnabled(boolean enabled)
+    {
+        DEFAULT_CONFIG.setReadBeforeWriteListOperationsEnabled(enabled);
+    }
+
     private static String toCSV(Set<String> values)
     {
         return values == null ? "" : String.join(",", values);
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index c34f224..2abbdb5 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -93,6 +93,13 @@ public interface GuardrailsConfig
     IntThreshold getPageSize();
 
     /**
+     * Returns whether list operations that require read before write are 
allowed.
+     *
+     * @return {@code true} if list operations that require read before write 
are allowed, {@code false} otherwise.
+     */
+    boolean getReadBeforeWriteListOperationsEnabled();
+
+    /**
      * Configuration of {@code int}-based thresholds to check if the guarded 
value should trigger a warning or abort the
      * operation.
      */
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index c3509ee..197addd 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -206,4 +206,18 @@ public interface GuardrailsMBean
      * @param abort The threshold to prevent requesting pages with more 
elements than threshold. -1 means disabled.
      */
     void setPageSizeThreshold(int warn, int abort);
+
+    /**
+     * Returns whether list operations that require read before write are 
allowed.
+     *
+     * @return {@code true} if list operations that require read before write 
are allowed, {@code false} otherwise.
+     */
+    boolean getReadBeforeWriteListOperationsEnabled();
+
+    /**
+     * Sets whether list operations that require read before write are allowed.
+     *
+     * @param enabled {@code true} if list operations that require read before 
write are allowed, {@code false} otherwise.
+     */
+    void setReadBeforeWriteListOperationsEnabled(boolean enabled);
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 48a0e1f..919c199 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -247,6 +248,7 @@ public class CQLSSTableWriter implements Closeable
         // and that forces a lot of initialization that we don't want.
         UpdateParameters params = new UpdateParameters(insert.metadata,
                                                        insert.updatedColumns(),
+                                                       
ClientState.forInternalCalls(),
                                                        options,
                                                        
insert.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
                                                        (int) 
TimeUnit.MILLISECONDS.toSeconds(now),
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java 
b/src/java/org/apache/cassandra/service/CASRequest.java
index 88fb9bd..77d1f5d 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -42,5 +42,5 @@ public interface CASRequest
      * The updates to perform of a CAS success. The values fetched using the 
readFilter()
      * are passed as argument.
      */
-    public PartitionUpdate makeUpdates(FilteredPartition current) throws 
InvalidRequestException;
+    public PartitionUpdate makeUpdates(FilteredPartition current, ClientState 
state) throws InvalidRequestException;
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index c3c0c88..4780e45 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -315,7 +315,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 // Create the desired updates
-                PartitionUpdate updates = request.makeUpdates(current);
+                PartitionUpdate updates = request.makeUpdates(current, state);
 
                 long size = updates.dataSize();
                 casWriteMetrics.mutationSize.update(size);
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java 
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
index d487e2d..4d10107 100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -124,7 +125,7 @@ public class BatchStatementBench
     @Benchmark
     public void bench()
     {
-        bs.getMutations(bqo, false, nowInSec, nowInSec, queryStartTime);
+        bs.getMutations(ClientState.forInternalCalls(), bqo, false, nowInSec, 
nowInSec, queryStartTime);
     }
 
 
diff --git a/test/unit/org/apache/cassandra/cql3/ListsTest.java 
b/test/unit/org/apache/cassandra/cql3/ListsTest.java
index 92dcd96..27c36b8 100644
--- a/test/unit/org/apache/cassandra/cql3/ListsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ListsTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
@@ -141,8 +142,14 @@ public class ListsTest extends CQLTester
 
         ByteBuffer keyBuf = ByteBufferUtil.bytes("key");
         DecoratedKey key = Murmur3Partitioner.instance.decorateKey(keyBuf);
-        UpdateParameters parameters =
-            new UpdateParameters(metaData, null, QueryOptions.DEFAULT, 
System.currentTimeMillis(), FBUtilities.nowInSeconds(), 1000, 
Collections.emptyMap());
+        UpdateParameters parameters = new UpdateParameters(metaData,
+                                                           null,
+                                                           
ClientState.forInternalCalls(),
+                                                           
QueryOptions.DEFAULT,
+                                                           
System.currentTimeMillis(),
+                                                           
FBUtilities.nowInSeconds(),
+                                                           1000,
+                                                           
Collections.emptyMap());
         Clustering<?> clustering = Clustering.make(ByteBufferUtil.bytes(1));
         parameters.newRow(clustering);
         prepender.execute(key, parameters);
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailReadBeforeWriteListOperationsTest.java
 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailReadBeforeWriteListOperationsTest.java
new file mode 100644
index 0000000..a4049c7
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailReadBeforeWriteListOperationsTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests the guardrail for read-before-write list operations, {@link 
Guardrails#readBeforeWriteListOperationsEnabled}.
+ */
+@RunWith(Parameterized.class)
+public class GuardrailReadBeforeWriteListOperationsTest extends GuardrailTester
+{
+    @Parameterized.Parameter
+    public boolean enabled;
+
+    @Parameterized.Parameters(name = 
"read_before_write_list_operations_enabled={0}")
+    public static Collection<Object> data()
+    {
+        return Arrays.asList(false, true);
+    }
+
+    @Before
+    public void before()
+    {
+        guardrails().setReadBeforeWriteListOperationsEnabled(enabled);
+        Assert.assertEquals(enabled, 
guardrails().getReadBeforeWriteListOperationsEnabled());
+
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<int>)");
+    }
+
+    @Test
+    public void tesInsertFullValue() throws Throwable
+    {
+        // insert from scratch
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2])");
+        assertRows(row(0, list(1, 2)));
+
+        // insert overriding previous value
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [2, 3])");
+        assertRows(row(0, list(2, 3)));
+    }
+
+    @Test
+    public void testUpdateFullValue() throws Throwable
+    {
+        // update from scratch
+        assertValid("UPDATE %s SET l = [1, 2] WHERE k = 0");
+        assertRows(row(0, list(1, 2)));
+
+        // update overriding previous value
+        assertValid("UPDATE %s SET l = [2, 3] WHERE k = 0");
+        assertRows(row(0, list(2, 3)));
+    }
+
+    @Test
+    public void testDeleteFullValue() throws Throwable
+    {
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2])");
+        assertValid("DELETE l FROM %s WHERE k = 0");
+        assertRows(row(0, null));
+    }
+
+    @Test
+    public void testAppend() throws Throwable
+    {
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2])");
+        assertValid("UPDATE %s SET l = l + [3, 4] WHERE k = 0");
+        assertRows(row(0, list(1, 2, 3, 4)));
+    }
+
+    @Test
+    public void testPrepend() throws Throwable
+    {
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2])");
+        assertValid("UPDATE %s SET l = [3, 4] + l WHERE k = 0");
+        assertRows(row(0, list(3, 4, 1, 2)));
+    }
+
+    @Test
+    public void testUpdateByIndex() throws Throwable
+    {
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])");
+        testGuardrail("UPDATE %s SET l[1] = 4 WHERE k = 0",
+                      "Setting of list items by index requiring read before 
write is not allowed",
+                      row(0, list(1, 4, 3)));
+    }
+
+    @Test
+    public void testDeleteByIndex() throws Throwable
+    {
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])");
+        testGuardrail("DELETE l[1] FROM %s WHERE k = 0",
+                      "Removal of list items by index requiring read before 
write is not allowed",
+                      row(0, list(1, 3)));
+    }
+
+    @Test
+    public void testDeleteByItem() throws Throwable
+    {
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])");
+        testGuardrail("UPDATE %s SET l = l - [2] WHERE k = 0",
+                      "Removal of list items requiring read before write is 
not allowed",
+                      row(0, list(1, 3)));
+    }
+
+    @Test
+    public void testBatch() throws Throwable
+    {
+        assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])");
+
+        testGuardrail("BEGIN BATCH UPDATE %s SET l[1] = 0 WHERE k = 0; APPLY 
BATCH",
+                      "Setting of list items by index requiring read before 
write is not allowed",
+                      row(0, list(1, 0, 3)));
+
+        testGuardrail("BEGIN BATCH DELETE l[1] FROM %s WHERE k = 0; APPLY 
BATCH",
+                      "Removal of list items by index requiring read before 
write is not allowed",
+                      row(0, list(1, 3)));
+
+        testGuardrail("BEGIN BATCH UPDATE %s SET l = l - [3] WHERE k = 0; 
APPLY BATCH",
+                      "Removal of list items requiring read before write is 
not allowed",
+                      row(0, list(1)));
+    }
+
+    @Test
+    public void testExcludedUsers() throws Throwable
+    {
+        testExcludedUsers(() -> "INSERT INTO %s (k, l) VALUES (0, [1, 2, 3, 4, 
5])",
+                          () -> "UPDATE %s SET l[1] = 4 WHERE k = 0",
+                          () -> "DELETE l[1] FROM %s WHERE k = 0",
+                          () -> "INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])",
+                          () -> "UPDATE %s SET l = l - [2] WHERE k = 0",
+                          () -> "BEGIN BATCH UPDATE %s SET l[1] = 0 WHERE k = 
0; APPLY BATCH",
+                          () -> "BEGIN BATCH DELETE l[1] FROM %s WHERE k = 0; 
APPLY BATCH",
+                          () -> "BEGIN BATCH UPDATE %s SET l = l - [3] WHERE k 
= 0; APPLY BATCH");
+    }
+
+    private void testGuardrail(String query, String expectedMessage, 
Object[]... rows) throws Throwable
+    {
+        if (enabled)
+        {
+            assertValid(query);
+            assertRows(rows);
+        }
+        else
+        {
+            assertAborts(expectedMessage, query);
+        }
+    }
+
+    private void assertRows(Object[]... rows) throws Throwable
+    {
+        assertRowsNet(executeNet("SELECT * FROM %s"), rows);
+    }
+}
diff --git 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
index 7028b84..84ac5f1 100644
--- 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
+++ 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
@@ -252,6 +252,7 @@ public class StressCQLSSTableWriter implements Closeable
         // and that forces a lot of initialization that we don't want.
         UpdateParameters params = new UpdateParameters(insert.metadata(),
                                                        insert.updatedColumns(),
+                                                       
ClientState.forInternalCalls(),
                                                        options,
                                                        
insert.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
                                                        (int) 
TimeUnit.MILLISECONDS.toSeconds(now),

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to