This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 95522f8 Add guardrail for list operations that require read before
write
95522f8 is described below
commit 95522f85d5e14734f9af3096953974a4f48a884f
Author: Andrés de la Peña <[email protected]>
AuthorDate: Wed Dec 15 16:31:24 2021 +0000
Add guardrail for list operations that require read before write
patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova for
CASSANDRA-17154
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 3 +
.../apache/cassandra/config/GuardrailsOptions.java | 15 ++
src/java/org/apache/cassandra/cql3/Lists.java | 11 ++
.../apache/cassandra/cql3/UpdateParameters.java | 4 +
.../cassandra/cql3/statements/BatchStatement.java | 12 +-
.../cassandra/cql3/statements/CQL3CasRequest.java | 13 +-
.../cql3/statements/ModificationStatement.java | 34 ++--
.../apache/cassandra/db/guardrails/Guardrails.java | 18 +++
.../cassandra/db/guardrails/GuardrailsConfig.java | 7 +
.../cassandra/db/guardrails/GuardrailsMBean.java | 14 ++
.../cassandra/io/sstable/CQLSSTableWriter.java | 2 +
.../org/apache/cassandra/service/CASRequest.java | 2 +-
.../org/apache/cassandra/service/StorageProxy.java | 2 +-
.../test/microbench/BatchStatementBench.java | 3 +-
test/unit/org/apache/cassandra/cql3/ListsTest.java | 11 +-
...GuardrailReadBeforeWriteListOperationsTest.java | 177 +++++++++++++++++++++
.../io/sstable/StressCQLSSTableWriter.java | 1 +
18 files changed, 303 insertions(+), 27 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 99c38b8..771302e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Add guardrail for list operations that require read before write
(CASSANDRA-17154)
* Migrate thresholds for number of keyspaces and tables to guardrails
(CASSANDRA-17195)
* Remove self-reference in SSTableTidier (CASSANDRA-17205)
* Add guardrail for query page size (CASSANDRA-17189)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 541aa6e..b0ab525 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1618,3 +1618,6 @@ enable_drop_compact_storage: false
# page_size:
# warn_threshold: -1
# abort_threshold: -1
+# Guardrail to allow/disallow list operations that require read before write,
i.e. setting list element by index and
+# removing list elements by either index or value. Defaults to true.
+# read_before_write_list_operations_enabled: true
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index e019b84..ff42f59 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -66,6 +66,7 @@ public class GuardrailsOptions implements GuardrailsConfig
public final IntThreshold page_size = new IntThreshold();
public volatile boolean user_timestamps_enabled = true;
+ public volatile boolean read_before_write_list_operations_enabled = true;
public void validate()
{
@@ -150,6 +151,20 @@ public class GuardrailsOptions implements GuardrailsConfig
x -> user_timestamps_enabled = x);
}
+ @Override
+ public boolean getReadBeforeWriteListOperationsEnabled()
+ {
+ return read_before_write_list_operations_enabled;
+ }
+
+ public void setReadBeforeWriteListOperationsEnabled(boolean enabled)
+ {
+ updatePropertyWithLogging(NAME_PREFIX +
"read_before_write_list_operations_enabled",
+ enabled,
+ () ->
read_before_write_list_operations_enabled,
+ x ->
read_before_write_list_operations_enabled = x);
+ }
+
private static <T> void updatePropertyWithLogging(String propertyName, T
newValue, Supplier<T> getter, Consumer<T> setter)
{
T oldValue = getter.get();
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java
b/src/java/org/apache/cassandra/cql3/Lists.java
index a2c2608..cc14e76 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.schema.ColumnMetadata;
import com.google.common.annotations.VisibleForTesting;
@@ -448,6 +449,9 @@ public abstract class Lists
// we should not get here for frozen lists
assert column.type.isMultiCell() : "Attempted to set an individual
element on a frozen list";
+ Guardrails.readBeforeWriteListOperationsEnabled
+ .ensureEnabled("Setting of list items by index requiring read
before write", params.clientState);
+
ByteBuffer index = idx.bindAndGet(params.options);
ByteBuffer value = t.bindAndGet(params.options);
@@ -565,6 +569,9 @@ public abstract class Lists
{
assert column.type.isMultiCell() : "Attempted to delete from a
frozen list";
+ Guardrails.readBeforeWriteListOperationsEnabled
+ .ensureEnabled("Removal of list items requiring read before
write", params.clientState);
+
// We want to call bind before possibly returning to reject
queries where the value provided is not a list.
Term.Terminal value = t.bind(params.options);
@@ -602,6 +609,10 @@ public abstract class Lists
public void execute(DecoratedKey partitionKey, UpdateParameters
params) throws InvalidRequestException
{
assert column.type.isMultiCell() : "Attempted to delete an item by
index from a frozen list";
+
+ Guardrails.readBeforeWriteListOperationsEnabled
+ .ensureEnabled("Removal of list items by index requiring read
before write", params.clientState);
+
Term.Terminal index = t.bind(params.options);
if (index == null)
throw new InvalidRequestException("Invalid null value for list
index");
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 4272307..cbf9d51 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.ClientState;
/**
* Groups the parameters of an update query, and make building updates easier.
@@ -35,6 +36,7 @@ public class UpdateParameters
{
public final TableMetadata metadata;
public final RegularAndStaticColumns updatedColumns;
+ public final ClientState clientState;
public final QueryOptions options;
private final int nowInSec;
@@ -54,6 +56,7 @@ public class UpdateParameters
public UpdateParameters(TableMetadata metadata,
RegularAndStaticColumns updatedColumns,
+ ClientState clientState,
QueryOptions options,
long timestamp,
int nowInSec,
@@ -63,6 +66,7 @@ public class UpdateParameters
{
this.metadata = metadata;
this.updatedColumns = updatedColumns;
+ this.clientState = clientState;
this.options = options;
this.nowInSec = nowInSec;
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 24877ef..dabe05e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -265,7 +265,8 @@ public class BatchStatement implements CQLStatement
}
@VisibleForTesting
- public List<? extends IMutation> getMutations(BatchQueryOptions options,
+ public List<? extends IMutation> getMutations(ClientState state,
+ BatchQueryOptions options,
boolean local,
long batchTimestamp,
int nowInSeconds,
@@ -306,7 +307,7 @@ public class BatchStatement implements CQLStatement
}
QueryOptions statementOptions = options.forStatement(i);
long timestamp = attrs.getTimestamp(batchTimestamp,
statementOptions);
- statement.addUpdates(collector, partitionKeys.get(i),
statementOptions, local, timestamp, nowInSeconds, queryStartNanoTime);
+ statement.addUpdates(collector, partitionKeys.get(i), state,
statementOptions, local, timestamp, nowInSeconds, queryStartNanoTime);
}
if (tablesWithZeroGcGs != null)
@@ -415,7 +416,8 @@ public class BatchStatement implements CQLStatement
if (updatesVirtualTables)
executeInternalWithoutCondition(queryState, options,
queryStartNanoTime);
else
- executeWithoutConditions(getMutations(options, false, timestamp,
nowInSeconds, queryStartNanoTime), options.getConsistency(),
queryStartNanoTime);
+ executeWithoutConditions(getMutations(queryState.getClientState(),
options, false, timestamp, nowInSeconds, queryStartNanoTime),
+ options.getConsistency(),
queryStartNanoTime);
return new ResultMessage.Void();
}
@@ -560,7 +562,7 @@ public class BatchStatement implements CQLStatement
long timestamp = batchOptions.getTimestamp(queryState);
int nowInSeconds = batchOptions.getNowInSeconds(queryState);
- for (IMutation mutation : getMutations(batchOptions, true, timestamp,
nowInSeconds, queryStartNanoTime))
+ for (IMutation mutation : getMutations(queryState.getClientState(),
batchOptions, true, timestamp, nowInSeconds, queryStartNanoTime))
mutation.apply();
return null;
}
@@ -577,7 +579,7 @@ public class BatchStatement implements CQLStatement
long timestamp = options.getTimestamp(state);
int nowInSeconds = options.getNowInSeconds(state);
- try (RowIterator result = ModificationStatement.casInternal(request,
timestamp, nowInSeconds))
+ try (RowIterator result =
ModificationStatement.casInternal(state.getClientState(), request, timestamp,
nowInSeconds))
{
ResultSet resultSet =
ModificationStatement.buildCasResultSet(ksName,
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 563a639..cc02650 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.CASRequest;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
@@ -228,13 +229,13 @@ public class CQL3CasRequest implements CASRequest
return builder.build();
}
- public PartitionUpdate makeUpdates(FilteredPartition current) throws
InvalidRequestException
+ public PartitionUpdate makeUpdates(FilteredPartition current, ClientState
state) throws InvalidRequestException
{
PartitionUpdate.Builder updateBuilder = new
PartitionUpdate.Builder(metadata, key, updatedColumns(), conditions.size());
for (RowUpdate upd : updates)
- upd.applyUpdates(current, updateBuilder);
+ upd.applyUpdates(current, updateBuilder, state);
for (RangeDeletion upd : rangeDeletions)
- upd.applyUpdates(current, updateBuilder);
+ upd.applyUpdates(current, updateBuilder, state);
PartitionUpdate partitionUpdate = updateBuilder.build();
IndexRegistry.obtain(metadata).validate(partitionUpdate);
@@ -265,12 +266,13 @@ public class CQL3CasRequest implements CASRequest
this.nowInSeconds = nowInSeconds;
}
- void applyUpdates(FilteredPartition current, PartitionUpdate.Builder
updateBuilder)
+ void applyUpdates(FilteredPartition current, PartitionUpdate.Builder
updateBuilder, ClientState state)
{
Map<DecoratedKey, Partition> map = stmt.requiresRead() ?
Collections.singletonMap(key, current) : null;
UpdateParameters params =
new UpdateParameters(metadata,
updateBuilder.columns(),
+ state,
options,
timestamp,
nowInSeconds,
@@ -297,13 +299,14 @@ public class CQL3CasRequest implements CASRequest
this.nowInSeconds = nowInSeconds;
}
- void applyUpdates(FilteredPartition current, PartitionUpdate.Builder
updateBuilder)
+ void applyUpdates(FilteredPartition current, PartitionUpdate.Builder
updateBuilder, ClientState state)
{
// No slice statements currently require a read, but this
maintains consistency with RowUpdate, and future proofs us
Map<DecoratedKey, Partition> map = stmt.requiresRead() ?
Collections.singletonMap(key, current) : null;
UpdateParameters params =
new UpdateParameters(metadata,
updateBuilder.columns(),
+ state,
options,
timestamp,
nowInSeconds,
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 0a35e1f..eed528f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -476,7 +476,8 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
cl.validateForWrite();
List<? extends IMutation> mutations =
- getMutations(options,
+ getMutations(queryState.getClientState(),
+ options,
false,
options.getTimestamp(queryState),
options.getNowInSeconds(queryState),
@@ -639,7 +640,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
{
long timestamp = options.getTimestamp(queryState);
int nowInSeconds = options.getNowInSeconds(queryState);
- for (IMutation mutation : getMutations(options, true, timestamp,
nowInSeconds, queryStartNanoTime))
+ for (IMutation mutation : getMutations(queryState.getClientState(),
options, true, timestamp, nowInSeconds, queryStartNanoTime))
mutation.apply();
return null;
}
@@ -648,13 +649,13 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
{
CQL3CasRequest request = makeCasRequest(state, options);
- try (RowIterator result = casInternal(request,
options.getTimestamp(state), options.getNowInSeconds(state)))
+ try (RowIterator result = casInternal(state.getClientState(), request,
options.getTimestamp(state), options.getNowInSeconds(state)))
{
return new ResultMessage.Rows(buildCasResultSet(result, state,
options));
}
}
- static RowIterator casInternal(CQL3CasRequest request, long timestamp, int
nowInSeconds)
+ static RowIterator casInternal(ClientState state, CQL3CasRequest request,
long timestamp, int nowInSeconds)
{
UUID ballot = UUIDGen.getTimeUUIDFromMicros(timestamp);
@@ -669,7 +670,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
if (!request.appliesTo(current))
return current.rowIterator();
- PartitionUpdate updates = request.makeUpdates(current);
+ PartitionUpdate updates = request.makeUpdates(current, state);
updates = TriggerExecutor.instance.execute(updates);
Commit proposal = Commit.newProposal(ballot, updates);
@@ -680,27 +681,30 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
/**
* Convert statement into a list of mutations to apply on the server
*
+ * @param state the client state
* @param options value for prepared statement markers
* @param local if true, any requests (for collections) performed by
getMutation should be done locally only.
* @param timestamp the current timestamp in microseconds to use if no
timestamp is user provided.
*
* @return list of the mutations
*/
- private List<? extends IMutation> getMutations(QueryOptions options,
- boolean local,
- long timestamp,
- int nowInSeconds,
- long
queryStartNanoTime)
+ private List<? extends IMutation> getMutations(ClientState state,
+ QueryOptions options,
+ boolean local,
+ long timestamp,
+ int nowInSeconds,
+ long queryStartNanoTime)
{
List<ByteBuffer> keys = buildPartitionKeyNames(options);
HashMultiset<ByteBuffer> perPartitionKeyCounts =
HashMultiset.create(keys);
SingleTableUpdatesCollector collector = new
SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
- addUpdates(collector, keys, options, local, timestamp, nowInSeconds,
queryStartNanoTime);
+ addUpdates(collector, keys, state, options, local, timestamp,
nowInSeconds, queryStartNanoTime);
return collector.toMutations();
}
final void addUpdates(UpdatesCollector collector,
List<ByteBuffer> keys,
+ ClientState state,
QueryOptions options,
boolean local,
long timestamp,
@@ -717,6 +721,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
UpdateParameters params = makeUpdateParameters(keys,
new
ClusteringIndexSliceFilter(slices, false),
+ state,
options,
DataLimits.NONE,
local,
@@ -742,7 +747,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
if (restrictions.hasClusteringColumnsRestrictions() &&
clusterings.isEmpty())
return;
- UpdateParameters params = makeUpdateParameters(keys, clusterings,
options, local, timestamp, nowInSeconds, queryStartNanoTime);
+ UpdateParameters params = makeUpdateParameters(keys, clusterings,
state, options, local, timestamp, nowInSeconds, queryStartNanoTime);
for (ByteBuffer key : keys)
{
@@ -789,6 +794,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
NavigableSet<Clustering<?>>
clusterings,
+ ClientState state,
QueryOptions options,
boolean local,
long timestamp,
@@ -798,6 +804,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
if (clusterings.contains(Clustering.STATIC_CLUSTERING))
return makeUpdateParameters(keys,
new
ClusteringIndexSliceFilter(Slices.ALL, false),
+ state,
options,
DataLimits.cqlLimits(1),
local,
@@ -807,6 +814,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
return makeUpdateParameters(keys,
new
ClusteringIndexNamesFilter(clusterings, false),
+ state,
options,
DataLimits.NONE,
local,
@@ -817,6 +825,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
ClusteringIndexFilter filter,
+ ClientState state,
QueryOptions options,
DataLimits limits,
boolean local,
@@ -836,6 +845,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
return new UpdateParameters(metadata(),
updatedColumns(),
+ state,
options,
getTimestamp(timestamp, options),
nowInSeconds,
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index d29ab34..00f3492 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -127,6 +127,13 @@ public final class Guardrails implements GuardrailsMBean
: format("Aborting query for table %s, page size
%s exceeds abort threshold of %s.",
what, value, threshold));
+ /**
+ * Guardrail disabling operations on lists that require read before write.
+ */
+ public static final DisableFlag readBeforeWriteListOperationsEnabled =
+ new DisableFlag(state ->
!CONFIG_PROVIDER.getOrCreate(state).getReadBeforeWriteListOperationsEnabled(),
+ "List operation requiring read before write");
+
private Guardrails()
{
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
@@ -333,6 +340,17 @@ public final class Guardrails implements GuardrailsMBean
DEFAULT_CONFIG.getPageSize().setThresholds(warn, abort);
}
+ public boolean getReadBeforeWriteListOperationsEnabled()
+ {
+ return DEFAULT_CONFIG.getReadBeforeWriteListOperationsEnabled();
+ }
+
+ @Override
+ public void setReadBeforeWriteListOperationsEnabled(boolean enabled)
+ {
+ DEFAULT_CONFIG.setReadBeforeWriteListOperationsEnabled(enabled);
+ }
+
private static String toCSV(Set<String> values)
{
return values == null ? "" : String.join(",", values);
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index c34f224..2abbdb5 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -93,6 +93,13 @@ public interface GuardrailsConfig
IntThreshold getPageSize();
/**
+ * Returns whether list operations that require read before write are
allowed.
+ *
+ * @return {@code true} if list operations that require read before write
are allowed, {@code false} otherwise.
+ */
+ boolean getReadBeforeWriteListOperationsEnabled();
+
+ /**
* Configuration of {@code int}-based thresholds to check if the guarded
value should trigger a warning or abort the
* operation.
*/
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index c3509ee..197addd 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -206,4 +206,18 @@ public interface GuardrailsMBean
* @param abort The threshold to prevent requesting pages with more
elements than threshold. -1 means disabled.
*/
void setPageSizeThreshold(int warn, int abort);
+
+ /**
+ * Returns whether list operations that require read before write are
allowed.
+ *
+ * @return {@code true} if list operations that require read before write
are allowed, {@code false} otherwise.
+ */
+ boolean getReadBeforeWriteListOperationsEnabled();
+
+ /**
+ * Sets whether list operations that require read before write are allowed.
+ *
+ * @param enabled {@code true} if list operations that require read before
write are allowed, {@code false} otherwise.
+ */
+ void setReadBeforeWriteListOperationsEnabled(boolean enabled);
}
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 48a0e1f..919c199 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -247,6 +248,7 @@ public class CQLSSTableWriter implements Closeable
// and that forces a lot of initialization that we don't want.
UpdateParameters params = new UpdateParameters(insert.metadata,
insert.updatedColumns(),
+
ClientState.forInternalCalls(),
options,
insert.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
(int)
TimeUnit.MILLISECONDS.toSeconds(now),
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java
b/src/java/org/apache/cassandra/service/CASRequest.java
index 88fb9bd..77d1f5d 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -42,5 +42,5 @@ public interface CASRequest
* The updates to perform of a CAS success. The values fetched using the
readFilter()
* are passed as argument.
*/
- public PartitionUpdate makeUpdates(FilteredPartition current) throws
InvalidRequestException;
+ public PartitionUpdate makeUpdates(FilteredPartition current, ClientState
state) throws InvalidRequestException;
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index c3c0c88..4780e45 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -315,7 +315,7 @@ public class StorageProxy implements StorageProxyMBean
}
// Create the desired updates
- PartitionUpdate updates = request.makeUpdates(current);
+ PartitionUpdate updates = request.makeUpdates(current, state);
long size = updates.dataSize();
casWriteMetrics.mutationSize.update(size);
diff --git
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
index d487e2d..4d10107 100644
---
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
+++
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.utils.FBUtilities;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -124,7 +125,7 @@ public class BatchStatementBench
@Benchmark
public void bench()
{
- bs.getMutations(bqo, false, nowInSec, nowInSec, queryStartTime);
+ bs.getMutations(ClientState.forInternalCalls(), bqo, false, nowInSec,
nowInSec, queryStartTime);
}
diff --git a/test/unit/org/apache/cassandra/cql3/ListsTest.java
b/test/unit/org/apache/cassandra/cql3/ListsTest.java
index 92dcd96..27c36b8 100644
--- a/test/unit/org/apache/cassandra/cql3/ListsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ListsTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -141,8 +142,14 @@ public class ListsTest extends CQLTester
ByteBuffer keyBuf = ByteBufferUtil.bytes("key");
DecoratedKey key = Murmur3Partitioner.instance.decorateKey(keyBuf);
- UpdateParameters parameters =
- new UpdateParameters(metaData, null, QueryOptions.DEFAULT,
System.currentTimeMillis(), FBUtilities.nowInSeconds(), 1000,
Collections.emptyMap());
+ UpdateParameters parameters = new UpdateParameters(metaData,
+ null,
+
ClientState.forInternalCalls(),
+
QueryOptions.DEFAULT,
+
System.currentTimeMillis(),
+
FBUtilities.nowInSeconds(),
+ 1000,
+
Collections.emptyMap());
Clustering<?> clustering = Clustering.make(ByteBufferUtil.bytes(1));
parameters.newRow(clustering);
prepender.execute(key, parameters);
diff --git
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailReadBeforeWriteListOperationsTest.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailReadBeforeWriteListOperationsTest.java
new file mode 100644
index 0000000..a4049c7
--- /dev/null
+++
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailReadBeforeWriteListOperationsTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests the guardrail for read-before-write list operations, {@link
Guardrails#readBeforeWriteListOperationsEnabled}.
+ */
+@RunWith(Parameterized.class)
+public class GuardrailReadBeforeWriteListOperationsTest extends GuardrailTester
+{
+ @Parameterized.Parameter
+ public boolean enabled;
+
+ @Parameterized.Parameters(name =
"read_before_write_list_operations_enabled={0}")
+ public static Collection<Object> data()
+ {
+ return Arrays.asList(false, true);
+ }
+
+ @Before
+ public void before()
+ {
+ guardrails().setReadBeforeWriteListOperationsEnabled(enabled);
+ Assert.assertEquals(enabled,
guardrails().getReadBeforeWriteListOperationsEnabled());
+
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<int>)");
+ }
+
+ @Test
+ public void tesInsertFullValue() throws Throwable
+ {
+ // insert from scratch
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2])");
+ assertRows(row(0, list(1, 2)));
+
+ // insert overriding previous value
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [2, 3])");
+ assertRows(row(0, list(2, 3)));
+ }
+
+ @Test
+ public void testUpdateFullValue() throws Throwable
+ {
+ // update from scratch
+ assertValid("UPDATE %s SET l = [1, 2] WHERE k = 0");
+ assertRows(row(0, list(1, 2)));
+
+ // update overriding previous value
+ assertValid("UPDATE %s SET l = [2, 3] WHERE k = 0");
+ assertRows(row(0, list(2, 3)));
+ }
+
+ @Test
+ public void testDeleteFullValue() throws Throwable
+ {
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2])");
+ assertValid("DELETE l FROM %s WHERE k = 0");
+ assertRows(row(0, null));
+ }
+
+ @Test
+ public void testAppend() throws Throwable
+ {
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2])");
+ assertValid("UPDATE %s SET l = l + [3, 4] WHERE k = 0");
+ assertRows(row(0, list(1, 2, 3, 4)));
+ }
+
+ @Test
+ public void testPrepend() throws Throwable
+ {
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2])");
+ assertValid("UPDATE %s SET l = [3, 4] + l WHERE k = 0");
+ assertRows(row(0, list(3, 4, 1, 2)));
+ }
+
+ @Test
+ public void testUpdateByIndex() throws Throwable
+ {
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])");
+ testGuardrail("UPDATE %s SET l[1] = 4 WHERE k = 0",
+ "Setting of list items by index requiring read before
write is not allowed",
+ row(0, list(1, 4, 3)));
+ }
+
+ @Test
+ public void testDeleteByIndex() throws Throwable
+ {
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])");
+ testGuardrail("DELETE l[1] FROM %s WHERE k = 0",
+ "Removal of list items by index requiring read before
write is not allowed",
+ row(0, list(1, 3)));
+ }
+
+ @Test
+ public void testDeleteByItem() throws Throwable
+ {
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])");
+ testGuardrail("UPDATE %s SET l = l - [2] WHERE k = 0",
+ "Removal of list items requiring read before write is
not allowed",
+ row(0, list(1, 3)));
+ }
+
+ @Test
+ public void testBatch() throws Throwable
+ {
+ assertValid("INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])");
+
+ testGuardrail("BEGIN BATCH UPDATE %s SET l[1] = 0 WHERE k = 0; APPLY
BATCH",
+ "Setting of list items by index requiring read before
write is not allowed",
+ row(0, list(1, 0, 3)));
+
+ testGuardrail("BEGIN BATCH DELETE l[1] FROM %s WHERE k = 0; APPLY
BATCH",
+ "Removal of list items by index requiring read before
write is not allowed",
+ row(0, list(1, 3)));
+
+ testGuardrail("BEGIN BATCH UPDATE %s SET l = l - [3] WHERE k = 0;
APPLY BATCH",
+ "Removal of list items requiring read before write is
not allowed",
+ row(0, list(1)));
+ }
+
+ @Test
+ public void testExcludedUsers() throws Throwable
+ {
+ testExcludedUsers(() -> "INSERT INTO %s (k, l) VALUES (0, [1, 2, 3, 4,
5])",
+ () -> "UPDATE %s SET l[1] = 4 WHERE k = 0",
+ () -> "DELETE l[1] FROM %s WHERE k = 0",
+ () -> "INSERT INTO %s (k, l) VALUES (0, [1, 2, 3])",
+ () -> "UPDATE %s SET l = l - [2] WHERE k = 0",
+ () -> "BEGIN BATCH UPDATE %s SET l[1] = 0 WHERE k =
0; APPLY BATCH",
+ () -> "BEGIN BATCH DELETE l[1] FROM %s WHERE k = 0;
APPLY BATCH",
+ () -> "BEGIN BATCH UPDATE %s SET l = l - [3] WHERE k
= 0; APPLY BATCH");
+ }
+
+ private void testGuardrail(String query, String expectedMessage,
Object[]... rows) throws Throwable
+ {
+ if (enabled)
+ {
+ assertValid(query);
+ assertRows(rows);
+ }
+ else
+ {
+ assertAborts(expectedMessage, query);
+ }
+ }
+
+ private void assertRows(Object[]... rows) throws Throwable
+ {
+ assertRowsNet(executeNet("SELECT * FROM %s"), rows);
+ }
+}
diff --git
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
index 7028b84..84ac5f1 100644
---
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
+++
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
@@ -252,6 +252,7 @@ public class StressCQLSSTableWriter implements Closeable
// and that forces a lot of initialization that we don't want.
UpdateParameters params = new UpdateParameters(insert.metadata(),
insert.updatedColumns(),
+
ClientState.forInternalCalls(),
options,
insert.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
(int)
TimeUnit.MILLISECONDS.toSeconds(now),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]