This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 935bcf1 Add guardrails for collection items and size
935bcf1 is described below
commit 935bcf1e8732a4138c15205896945c2f02ddb844
Author: Andrés de la Peña <[email protected]>
AuthorDate: Tue Mar 15 13:31:41 2022 +0000
Add guardrails for collection items and size
patch by Andrés de la Peña; reviewed by Berenguer Blasi for CASSANDRA-17153
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 16 +
src/java/org/apache/cassandra/config/Config.java | 5 +
.../apache/cassandra/config/GuardrailsOptions.java | 84 +++-
src/java/org/apache/cassandra/cql3/Lists.java | 35 +-
src/java/org/apache/cassandra/cql3/Maps.java | 35 +-
src/java/org/apache/cassandra/cql3/Sets.java | 34 +-
.../apache/cassandra/cql3/UpdateParameters.java | 7 +-
src/java/org/apache/cassandra/db/ColumnIndex.java | 7 +-
.../cassandra/db/guardrails/DisableFlag.java | 7 +-
.../apache/cassandra/db/guardrails/Guardrail.java | 5 +-
.../apache/cassandra/db/guardrails/Guardrails.java | 72 +++-
.../cassandra/db/guardrails/GuardrailsConfig.java | 21 +
.../cassandra/db/guardrails/GuardrailsMBean.java | 34 +-
.../apache/cassandra/db/guardrails/Threshold.java | 35 +-
.../org/apache/cassandra/db/guardrails/Values.java | 7 +-
.../cassandra/io/sstable/format/SSTableWriter.java | 48 +++
.../org/apache/cassandra/schema/TableMetadata.java | 61 +++
.../GuardrailCollectionSizeOnSSTableWriteTest.java | 438 +++++++++++++++++++++
...rdrailItemsPerCollectionOnSSTableWriteTest.java | 340 ++++++++++++++++
.../test/guardrails/GuardrailTester.java | 154 ++++++++
.../db/guardrails/GuardrailCollectionSizeTest.java | 248 ++++++++++++
.../GuardrailItemsPerCollectionTest.java | 272 +++++++++++++
.../db/guardrails/GuardrailPageSizeTest.java | 6 -
.../cassandra/db/guardrails/GuardrailsTest.java | 16 +-
.../cassandra/db/guardrails/ThresholdTester.java | 74 ++--
.../apache/cassandra/schema/TableMetadataTest.java | 140 +++++++
27 files changed, 2098 insertions(+), 104 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index ea1826a..21c163e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Add guardrails for collection items and size (CASSANDRA-17153)
* Improve guardrails messages (CASSANDRA-17430)
* Remove all usages of junit.framework and ban them via Checkstyle
(CASSANDRA-17316)
* Add guardrails for read/write consistency levels (CASSANDRA-17188)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8df5771..ce79767 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1632,6 +1632,22 @@ drop_compact_storage_enabled: false
# Guardrail to warn about or reject write consistency levels. By default, all
consistency levels are allowed.
# write_consistency_levels_warned: []
# write_consistency_levels_disallowed: []
+# Guardrail to warn or fail when encountering larger size of collection data
than threshold.
+# At query time this guardrail is applied only to the collection fragment that
is being writen, even though in the case
+# of non-frozen collections there could be unaccounted parts of the collection
on the sstables. This is done this way to
+# prevent read-before-write. The guardrail is also checked at sstable write
time to detect large non-frozen collections,
+# although in that case exceeding the fail threshold will only log an error
message, without interrupting the operation.
+# The two thresholds default to 0KiB to disable.
+# collection_size_warn_threshold: 0KiB
+# collection_size_fail_threshold: 0KiB
+# Guardrail to warn or fail when encountering more elements in collection than
threshold.
+# At query time this guardrail is applied only to the collection fragment that
is being writen, even though in the case
+# of non-frozen collections there could be unaccounted parts of the collection
on the sstables. This is done this way to
+# prevent read-before-write. The guardrail is also checked at sstable write
time to detect large non-frozen collections,
+# although in that case exceeding the fail threshold will only log an error
message, without interrupting the operation.
+# The two thresholds default to -1 to disable.
+# items_per_collection_warn_threshold: -1
+# items_per_collection_fail_threshold: -1
# Startup Checks are executed as part of Cassandra startup process, not all of
them
# are configurable (so you can disable them) but these which are enumerated
bellow.
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 671e49d..554b0bb 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -771,6 +771,7 @@ public class Config
public volatile SubnetGroups internode_error_reporting_exclusions = new
SubnetGroups();
public static final int DISABLED_GUARDRAIL = -1;
+ public static final DataStorageSpec DISABLED_SIZE_GUARDRAIL =
DataStorageSpec.inBytes(0);
public volatile boolean guardrails_enabled = false;
public volatile int keyspaces_warn_threshold = DISABLED_GUARDRAIL;
public volatile int keyspaces_fail_threshold = DISABLED_GUARDRAIL;
@@ -797,6 +798,10 @@ public class Config
public volatile Set<ConsistencyLevel> write_consistency_levels_disallowed
= Collections.emptySet();
public volatile boolean user_timestamps_enabled = true;
public volatile boolean read_before_write_list_operations_enabled = true;
+ public volatile DataStorageSpec collection_size_warn_threshold =
DISABLED_SIZE_GUARDRAIL;
+ public volatile DataStorageSpec collection_size_fail_threshold =
DISABLED_SIZE_GUARDRAIL;
+ public volatile int items_per_collection_warn_threshold =
DISABLED_GUARDRAIL;
+ public volatile int items_per_collection_fail_threshold =
DISABLED_GUARDRAIL;
public volatile DurationSpec streaming_state_expires =
DurationSpec.inDays(3);
public volatile DataStorageSpec streaming_state_size =
DataStorageSpec.inMebibytes(40);
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index a8b7ada..692c255 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -55,7 +55,7 @@ import static java.util.stream.Collectors.toSet;
public class GuardrailsOptions implements GuardrailsConfig
{
private static final Logger logger =
LoggerFactory.getLogger(GuardrailsOptions.class);
-
+
private final Config config;
public GuardrailsOptions(Config config)
@@ -77,6 +77,8 @@ public class GuardrailsOptions implements GuardrailsConfig
config.read_consistency_levels_disallowed =
validateConsistencyLevels(config.read_consistency_levels_disallowed,
"read_consistency_levels_disallowed");
config.write_consistency_levels_warned =
validateConsistencyLevels(config.write_consistency_levels_warned,
"write_consistency_levels_warned");
config.write_consistency_levels_disallowed =
validateConsistencyLevels(config.write_consistency_levels_disallowed,
"write_consistency_levels_disallowed");
+ validateSizeThreshold(config.collection_size_warn_threshold,
config.collection_size_fail_threshold, "collection_size");
+ validateIntThreshold(config.items_per_collection_warn_threshold,
config.items_per_collection_fail_threshold, "items_per_collection");
}
@Override
@@ -423,6 +425,55 @@ public class GuardrailsOptions implements GuardrailsConfig
x ->
config.write_consistency_levels_disallowed = x);
}
+ public DataStorageSpec getCollectionSizeWarnThreshold()
+ {
+ return config.collection_size_warn_threshold;
+ }
+
+ @Override
+ public DataStorageSpec getCollectionSizeFailThreshold()
+ {
+ return config.collection_size_fail_threshold;
+ }
+
+ public void setCollectionSizeThreshold(DataStorageSpec warn,
DataStorageSpec fail)
+ {
+ validateSizeThreshold(warn, fail, "collection_size");
+ updatePropertyWithLogging("collection_size_warn_threshold",
+ warn,
+ () -> config.collection_size_warn_threshold,
+ x -> config.collection_size_warn_threshold =
x);
+ updatePropertyWithLogging("collection_size_fail_threshold",
+ fail,
+ () -> config.collection_size_fail_threshold,
+ x -> config.collection_size_fail_threshold =
x);
+ }
+
+ @Override
+ public int getItemsPerCollectionWarnThreshold()
+ {
+ return config.items_per_collection_warn_threshold;
+ }
+
+ @Override
+ public int getItemsPerCollectionFailThreshold()
+ {
+ return config.items_per_collection_fail_threshold;
+ }
+
+ public void setItemsPerCollectionThreshold(int warn, int fail)
+ {
+ validateIntThreshold(warn, fail, "items_per_collection");
+ updatePropertyWithLogging("items_per_collection_warn_threshold",
+ warn,
+ () ->
config.items_per_collection_warn_threshold,
+ x ->
config.items_per_collection_warn_threshold = x);
+ updatePropertyWithLogging("items_per_collection_fail_threshold",
+ fail,
+ () ->
config.items_per_collection_fail_threshold,
+ x ->
config.items_per_collection_fail_threshold = x);
+ }
+
private static <T> void updatePropertyWithLogging(String propertyName, T
newValue, Supplier<T> getter, Consumer<T> setter)
{
T oldValue = getter.get();
@@ -433,36 +484,31 @@ public class GuardrailsOptions implements GuardrailsConfig
}
}
- private static void validatePositiveNumeric(long value, long maxValue,
boolean allowZero, String name)
+ private static void validatePositiveNumeric(long value, long maxValue,
String name)
{
+ if (value == Config.DISABLED_GUARDRAIL)
+ return;
+
if (value > maxValue)
throw new IllegalArgumentException(format("Invalid value %d for
%s: maximum allowed value is %d",
value, name, maxValue));
- if (value == 0 && !allowZero)
+ if (value == 0)
throw new IllegalArgumentException(format("Invalid value for %s: 0
is not allowed; " +
"if attempting to
disable use %d",
name,
Config.DISABLED_GUARDRAIL));
// We allow -1 as a general "disabling" flag. But reject anything
lower to avoid mistakes.
- if (value < Config.DISABLED_GUARDRAIL)
+ if (value <= 0)
throw new IllegalArgumentException(format("Invalid value %d for
%s: negative values are not allowed, " +
"outside of %d which
disables the guardrail",
value, name,
Config.DISABLED_GUARDRAIL));
}
- private static void validateStrictlyPositiveInteger(long value, String
name)
- {
- // We use 'long' for generality, but most numeric guardrail cannot
effectively be more than a 'int' for various
- // internal reasons. Not that any should ever come close in practice
...
- // Also, in most cases, zero does not make sense (allowing 0 tables or
columns is not exactly useful).
- validatePositiveNumeric(value, Integer.MAX_VALUE, false, name);
- }
-
private static void validateIntThreshold(int warn, int fail, String name)
{
- validateStrictlyPositiveInteger(warn, name + "_warn_threshold");
- validateStrictlyPositiveInteger(fail, name + "_fail_threshold");
+ validatePositiveNumeric(warn, Integer.MAX_VALUE, name +
"_warn_threshold");
+ validatePositiveNumeric(fail, Integer.MAX_VALUE, name +
"_fail_threshold");
validateWarnLowerThanFail(warn, fail, name);
}
@@ -476,6 +522,16 @@ public class GuardrailsOptions implements GuardrailsConfig
"than the fail threshold
%d", warn, name, fail));
}
+ private static void validateSizeThreshold(DataStorageSpec warn,
DataStorageSpec fail, String name)
+ {
+ if (warn.equals(Config.DISABLED_SIZE_GUARDRAIL) ||
fail.equals(Config.DISABLED_SIZE_GUARDRAIL))
+ return;
+
+ if (fail.toBytes() < warn.toBytes())
+ throw new IllegalArgumentException(format("The warn threshold %s
for %s_warn_threshold should be lower " +
+ "than the fail threshold
%s", warn, name, fail));
+ }
+
private static Set<String> validateTableProperties(Set<String> properties,
String name)
{
if (properties == null)
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java
b/src/java/org/apache/cassandra/cql3/Lists.java
index 08de949..dba9ba6 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -492,26 +492,43 @@ public abstract class Lists
static void doAppend(Term.Terminal value, ColumnMetadata column,
UpdateParameters params) throws InvalidRequestException
{
- if (column.type.isMultiCell())
+ if (value == null)
{
+ // for frozen lists, we're overwriting the whole cell value
+ if (!column.type.isMultiCell())
+ params.addTombstone(column);
+
// If we append null, do nothing. Note that for Setter, we've
// already removed the previous value so we're good here too
- if (value == null)
+ return;
+ }
+
+ List<ByteBuffer> elements = ((Value) value).elements;
+
+ if (column.type.isMultiCell())
+ {
+ if (elements.size() == 0)
return;
- for (ByteBuffer buffer : ((Value) value).elements)
+ // Guardrails about collection size are only checked for the
added elements without considering
+ // already existent elements. This is done so to avoid
read-before-write, having additional checks
+ // during SSTable write.
+ Guardrails.itemsPerCollection.guard(elements.size(),
column.name.toString(), params.clientState);
+
+ int dataSize = 0;
+ for (ByteBuffer buffer : elements)
{
ByteBuffer uuid =
ByteBuffer.wrap(params.nextTimeUUIDAsBytes());
- params.addCell(column, CellPath.create(uuid), buffer);
+ Cell<?> cell = params.addCell(column,
CellPath.create(uuid), buffer);
+ dataSize += cell.dataSize();
}
+ Guardrails.collectionSize.guard(dataSize,
column.name.toString(), params.clientState);
}
else
{
- // for frozen lists, we're overwriting the whole cell value
- if (value == null)
- params.addTombstone(column);
- else
- params.addCell(column, value.get(ProtocolVersion.CURRENT));
+ Guardrails.itemsPerCollection.guard(elements.size(),
column.name.toString(), params.clientState);
+ Cell<?> cell = params.addCell(column,
value.get(ProtocolVersion.CURRENT));
+ Guardrails.collectionSize.guard(cell.dataSize(),
column.name.toString(), params.clientState);
}
}
}
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java
b/src/java/org/apache/cassandra/cql3/Maps.java
index 6e7e07b..b5ee2ec 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
+import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.DecoratedKey;
@@ -421,22 +422,40 @@ public abstract class Maps
static void doPut(Term.Terminal value, ColumnMetadata column,
UpdateParameters params) throws InvalidRequestException
{
+ if (value == null)
+ {
+ // for frozen maps, we're overwriting the whole cell
+ if (!column.type.isMultiCell())
+ params.addTombstone(column);
+
+ return;
+ }
+
+ Map<ByteBuffer, ByteBuffer> elements = ((Value) value).map;
+
if (column.type.isMultiCell())
{
- if (value == null)
+ if (elements.size() == 0)
return;
- Map<ByteBuffer, ByteBuffer> elements = ((Value) value).map;
+ // Guardrails about collection size are only checked for the
added elements without considering
+ // already existent elements. This is done so to avoid
read-before-write, having additional checks
+ // during SSTable write.
+ Guardrails.itemsPerCollection.guard(elements.size(),
column.name.toString(), params.clientState);
+
+ int dataSize = 0;
for (Map.Entry<ByteBuffer, ByteBuffer> entry :
elements.entrySet())
- params.addCell(column, CellPath.create(entry.getKey()),
entry.getValue());
+ {
+ Cell<?> cell = params.addCell(column,
CellPath.create(entry.getKey()), entry.getValue());
+ dataSize += cell.dataSize();
+ }
+ Guardrails.collectionSize.guard(dataSize,
column.name.toString(), params.clientState);
}
else
{
- // for frozen maps, we're overwriting the whole cell
- if (value == null)
- params.addTombstone(column);
- else
- params.addCell(column, value.get(ProtocolVersion.CURRENT));
+ Guardrails.itemsPerCollection.guard(elements.size(),
column.name.toString(), params.clientState);
+ Cell<?> cell = params.addCell(column,
value.get(ProtocolVersion.CURRENT));
+ Guardrails.collectionSize.guard(cell.dataSize(),
column.name.toString(), params.clientState);
}
}
}
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java
b/src/java/org/apache/cassandra/cql3/Sets.java
index 6b53d55..146f7b7 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -24,6 +24,7 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.DecoratedKey;
@@ -348,26 +349,43 @@ public abstract class Sets
static void doAdd(Term.Terminal value, ColumnMetadata column,
UpdateParameters params) throws InvalidRequestException
{
+ if (value == null)
+ {
+ // for frozen sets, we're overwriting the whole cell
+ if (!column.type.isMultiCell())
+ params.addTombstone(column);
+
+ return;
+ }
+
+ SortedSet<ByteBuffer> elements = ((Value) value).elements;
+
if (column.type.isMultiCell())
{
- if (value == null)
+ if (elements.size() == 0)
return;
- for (ByteBuffer bb : ((Value) value).elements)
+ // Guardrails about collection size are only checked for the
added elements without considering
+ // already existent elements. This is done so to avoid
read-before-write, having additional checks
+ // during SSTable write.
+ Guardrails.itemsPerCollection.guard(elements.size(),
column.name.toString(), params.clientState);
+
+ int dataSize = 0;
+ for (ByteBuffer bb : elements)
{
if (bb == ByteBufferUtil.UNSET_BYTE_BUFFER)
continue;
- params.addCell(column, CellPath.create(bb),
ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ Cell<?> cell = params.addCell(column, CellPath.create(bb),
ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ dataSize += cell.dataSize();
}
+ Guardrails.collectionSize.guard(dataSize,
column.name.toString(), params.clientState);
}
else
{
- // for frozen sets, we're overwriting the whole cell
- if (value == null)
- params.addTombstone(column);
- else
- params.addCell(column, value.get(ProtocolVersion.CURRENT));
+ Guardrails.itemsPerCollection.guard(elements.size(),
column.name.toString(), params.clientState);
+ Cell<?> cell = params.addCell(column,
value.get(ProtocolVersion.CURRENT));
+ Guardrails.collectionSize.guard(cell.dataSize(),
column.name.toString(), params.clientState);
}
}
}
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 8f3eb37..2d59366 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -146,17 +146,18 @@ public class UpdateParameters
builder.addCell(BufferCell.tombstone(column, timestamp, nowInSec,
path));
}
- public void addCell(ColumnMetadata column, ByteBuffer value) throws
InvalidRequestException
+ public Cell<?> addCell(ColumnMetadata column, ByteBuffer value) throws
InvalidRequestException
{
- addCell(column, null, value);
+ return addCell(column, null, value);
}
- public void addCell(ColumnMetadata column, CellPath path, ByteBuffer
value) throws InvalidRequestException
+ public Cell<?> addCell(ColumnMetadata column, CellPath path, ByteBuffer
value) throws InvalidRequestException
{
Cell<?> cell = ttl == LivenessInfo.NO_TTL
? BufferCell.live(column, timestamp, value, path)
: BufferCell.expiring(column, timestamp, ttl, nowInSec,
value, path);
builder.addCell(cell);
+ return cell;
}
public void addCounter(ColumnMetadata column, long increment) throws
InvalidRequestException
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 15405bd..f7860df 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.SequentialWriter;
@@ -115,7 +116,11 @@ public class ColumnIndex
this.headerLength = writer.position() - initialPosition;
while (iterator.hasNext())
- add(iterator.next());
+ {
+ Unfiltered unfiltered = iterator.next();
+ SSTableWriter.guardCollectionSize(iterator.metadata(),
iterator.partitionKey(), unfiltered);
+ add(unfiltered);
+ }
finish();
}
diff --git a/src/java/org/apache/cassandra/db/guardrails/DisableFlag.java
b/src/java/org/apache/cassandra/db/guardrails/DisableFlag.java
index b6a8ed9..9ec1951 100644
--- a/src/java/org/apache/cassandra/db/guardrails/DisableFlag.java
+++ b/src/java/org/apache/cassandra/db/guardrails/DisableFlag.java
@@ -73,11 +73,14 @@ public class DisableFlag extends Guardrail
*
* @param what The feature that is guarded by this guardrail (for
reporting in error messages).
* @param state The client state, used to skip the check if the query is
internal or is done by a superuser.
- * A {@code null} value means that the check should be done
regardless of the query.
+ * A {@code null} value means that the check should be done
regardless of the query, although it won't
+ * throw any exception if the failure threshold is exceeded.
This is so because checks without an
+ * associated client come from asynchronous processes such as
compaction, and we don't want to
+ * interrupt such processes.
*/
public void ensureEnabled(String what, @Nullable ClientState state)
{
if (enabled(state) && disabled.test(state))
- fail(what + " is not allowed");
+ fail(what + " is not allowed", state);
}
}
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
index 33dd8af..334d956 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
@@ -79,7 +79,7 @@ public abstract class Guardrail
Tracing.trace(message);
}
- protected void fail(String message)
+ protected void fail(String message, @Nullable ClientState state)
{
message = decorateMessage(message);
@@ -90,7 +90,8 @@ public abstract class Guardrail
// Similarly, tracing will also ignore the message if we're not
running tracing on the current thread.
Tracing.trace(message);
- throw new GuardrailViolatedException(message);
+ if (state != null)
+ throw new GuardrailViolatedException(message);
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 5adab5a..99aae55 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
+import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.GuardrailsOptions;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -178,6 +179,7 @@ public final class Guardrails implements GuardrailsMBean
: format("Aborting query because the cartesian
product of the IN restrictions on %s " +
"produces %d values, this exceeds fail
threshold of %s.",
what, value, threshold));
+
/**
* Guardrail on read consistency levels.
*/
@@ -198,6 +200,32 @@ public final class Guardrails implements GuardrailsMBean
state ->
CONFIG_PROVIDER.getOrCreate(state).getWriteConsistencyLevelsDisallowed(),
"write consistency levels");
+ /**
+ * Guardrail on the size of a collection.
+ */
+ public static final Threshold collectionSize =
+ new Threshold("collection_size",
+ state ->
CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold().toBytes(),
+ state ->
CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold().toBytes(),
+ (isWarning, what, value, threshold) ->
+ isWarning ? format("Detected collection %s of size %s, this
exceeds the warning threshold of %s.",
+ what, value, threshold)
+ : format("Detected collection %s of size %s, this
exceeds the failure threshold of %s.",
+ what, value, threshold));
+
+ /**
+ * Guardrail on the number of items of a collection.
+ */
+ public static final Threshold itemsPerCollection =
+ new Threshold("items_per_collection",
+ state ->
CONFIG_PROVIDER.getOrCreate(state).getItemsPerCollectionWarnThreshold(),
+ state ->
CONFIG_PROVIDER.getOrCreate(state).getItemsPerCollectionFailThreshold(),
+ (isWarning, what, value, threshold) ->
+ isWarning ? format("Detected collection %s with %s items,
this exceeds the warning threshold of %s.",
+ what, value, threshold)
+ : format("Detected collection %s with %s items,
this exceeds the failure threshold of %s.",
+ what, value, threshold));
+
private Guardrails()
{
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
@@ -446,21 +474,57 @@ public final class Guardrails implements GuardrailsMBean
}
@Override
+ public int getPartitionKeysInSelectWarnThreshold()
+ {
+ return DEFAULT_CONFIG.getPartitionKeysInSelectWarnThreshold();
+ }
+
+ @Override
+ public int getPartitionKeysInSelectFailThreshold()
+ {
+ return DEFAULT_CONFIG.getPartitionKeysInSelectFailThreshold();
+ }
+
+ @Override
public void setPartitionKeysInSelectThreshold(int warn, int fail)
{
DEFAULT_CONFIG.setPartitionKeysInSelectThreshold(warn, fail);
}
+ public long getCollectionSizeWarnThresholdInKiB()
+ {
+ return DEFAULT_CONFIG.getCollectionSizeWarnThreshold().toKibibytes();
+ }
+
@Override
- public int getPartitionKeysInSelectWarnThreshold()
+ public long getCollectionSizeFailThresholdInKiB()
{
- return DEFAULT_CONFIG.getPartitionKeysInSelectWarnThreshold();
+ return DEFAULT_CONFIG.getCollectionSizeFailThreshold().toKibibytes();
}
@Override
- public int getPartitionKeysInSelectFailThreshold()
+ public void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB)
{
- return DEFAULT_CONFIG.getPartitionKeysInSelectFailThreshold();
+
DEFAULT_CONFIG.setCollectionSizeThreshold(DataStorageSpec.inKibibytes(warnInKiB),
+
DataStorageSpec.inKibibytes(failInKiB));
+ }
+
+ @Override
+ public int getItemsPerCollectionWarnThreshold()
+ {
+ return DEFAULT_CONFIG.getItemsPerCollectionWarnThreshold();
+ }
+
+ @Override
+ public int getItemsPerCollectionFailThreshold()
+ {
+ return DEFAULT_CONFIG.getItemsPerCollectionFailThreshold();
+ }
+
+ @Override
+ public void setItemsPerCollectionThreshold(int warn, int fail)
+ {
+ DEFAULT_CONFIG.setItemsPerCollectionThreshold(warn, fail);
}
@Override
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index fc671e7..396d6cb 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.guardrails;
import java.util.Set;
+import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.db.ConsistencyLevel;
/**
@@ -183,4 +184,24 @@ public interface GuardrailsConfig
* @return The consistency levels that are disallowed when writing.
*/
Set<ConsistencyLevel> getWriteConsistencyLevelsDisallowed();
+
+ /*
+ * @return The threshold to warn when encountering a collection with
larger data size than threshold.
+ */
+ DataStorageSpec getCollectionSizeWarnThreshold();
+
+ /**
+ * @return The threshold to prevent collections with larger data size than
threshold.
+ */
+ DataStorageSpec getCollectionSizeFailThreshold();
+
+ /**
+ * @return The threshold to warn when encountering more elements in a
collection than threshold.
+ */
+ int getItemsPerCollectionWarnThreshold();
+
+ /**
+ * @return The threshold to prevent collections with more elements than
threshold.
+ */
+ int getItemsPerCollectionFailThreshold();
}
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index 6b59b05..7a4eb60 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -259,7 +259,7 @@ public interface GuardrailsMBean
* @param warn The threshold to warn when the number of partition keys in
a select statement is greater than
* threshold -1 means disabled.
* @param fail The threshold to prevent when the number of partition keys
in a select statement is more than
- * threshold -1 means disabled.
+ * threshold -1 means disabled.
*/
void setPartitionKeysInSelectThreshold(int warn, int fail);
@@ -362,4 +362,36 @@ public interface GuardrailsMBean
* @param consistencyLevels Comma-separated list of consistency levels
that are not allowed when writing.
*/
void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels);
+
+ /**
+ * @return The threshold to warn when encountering larger size of
collection data than threshold, in KiB.
+ */
+ long getCollectionSizeWarnThresholdInKiB();
+
+ /**
+ * @return The threshold to prevent collections with larger data size than
threshold, in KiB.
+ */
+ long getCollectionSizeFailThresholdInKiB();
+
+ /**
+ * @param warnInKiB The threshold to warn when encountering larger size of
collection data than threshold, in KiB.
+ * @param failInKiB The threshold to prevent collections with larger data
size than threshold, in KiB.
+ */
+ void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB);
+
+ /**
+ * @return The threshold to warn when encountering more elements in a
collection than threshold.
+ */
+ int getItemsPerCollectionWarnThreshold();
+
+ /**
+ * @return The threshold to prevent collections with more elements than
threshold.
+ */
+ int getItemsPerCollectionFailThreshold();
+
+ /**
+ * @param warn The threshold to warn when encountering more elements in a
collection than threshold.
+ * @param fail The threshold to prevent collectiosn with more elements
than threshold.
+ */
+ void setItemsPerCollectionThreshold(int warn, int fail);
}
diff --git a/src/java/org/apache/cassandra/db/guardrails/Threshold.java
b/src/java/org/apache/cassandra/db/guardrails/Threshold.java
index 85fddce..3e63cb3 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Threshold.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Threshold.java
@@ -67,13 +67,13 @@ public class Threshold extends Guardrail
private long failValue(ClientState state)
{
long failValue = failThreshold.applyAsLong(state);
- return failValue < 0 ? Long.MAX_VALUE : failValue;
+ return failValue <= 0 ? Long.MAX_VALUE : failValue;
}
private long warnValue(ClientState state)
{
long warnValue = warnThreshold.applyAsLong(state);
- return warnValue < 0 ? Long.MAX_VALUE : warnValue;
+ return warnValue <= 0 ? Long.MAX_VALUE : warnValue;
}
@Override
@@ -82,7 +82,25 @@ public class Threshold extends Guardrail
if (!super.enabled(state))
return false;
- return failThreshold.applyAsLong(state) >= 0 ||
warnThreshold.applyAsLong(state) >= 0;
+ return failThreshold.applyAsLong(state) > 0 ||
warnThreshold.applyAsLong(state) > 0;
+ }
+
+ /**
+ * Checks whether the provided value would trigger a warning or failure if
passed to {@link #guard}.
+ *
+ * <p>This method is optional (does not have to be called) but can be used
in the case where the "what"
+ * argument to {@link #guard} is expensive to build to save doing so in
the common case (of the guardrail
+ * not being triggered).
+ *
+ * @param value the value to test.
+ * @param state The client state, used to skip the check if the query is
internal or is done by a superuser.
+ * A {@code null} value means that the check should be done
regardless of the query.
+ * @return {@code true} if {@code value} is above the warning or failure
thresholds of this guardrail,
+ * {@code false otherwise}.
+ */
+ public boolean triggersOn(long value, @Nullable ClientState state)
+ {
+ return enabled(state) && (value > Math.min(failValue(state),
warnValue(state)));
}
/**
@@ -93,7 +111,10 @@ public class Threshold extends Guardrail
* guardrail is triggered. For instance, say the guardrail
guards the size of column values, then this
* argument must describe which column of which row is
triggering the guardrail for convenience.
* @param state The client state, used to skip the check if the query is
internal or is done by a superuser.
- * A {@code null} value means that the check should be done
regardless of the query.
+ * A {@code null} value means that the check should be done
regardless of the query, although it won't
+ * throw any exception if the failure threshold is exceeded.
This is so because checks without an
+ * associated client come from asynchronous processes such as
compaction, and we don't want to
+ * interrupt such processes.
*/
public void guard(long value, String what, @Nullable ClientState state)
{
@@ -103,7 +124,7 @@ public class Threshold extends Guardrail
long failValue = failValue(state);
if (value > failValue)
{
- triggerFail(value, failValue, what);
+ triggerFail(value, failValue, what, state);
return;
}
@@ -112,9 +133,9 @@ public class Threshold extends Guardrail
triggerWarn(value, warnValue, what);
}
- private void triggerFail(long value, long failValue, String what)
+ private void triggerFail(long value, long failValue, String what,
ClientState state)
{
- fail(errMsg(false, what, value, failValue));
+ fail(errMsg(false, what, value, failValue), state);
}
private void triggerWarn(long value, long warnValue, String what)
diff --git a/src/java/org/apache/cassandra/db/guardrails/Values.java
b/src/java/org/apache/cassandra/db/guardrails/Values.java
index edb5d2a..f46e3af 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Values.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Values.java
@@ -93,7 +93,10 @@ public class Values<T> extends Guardrail
* @param ignoreAction An action called on the subset of {@code values}
that should be ignored. This action
* should do whatever is necessary to make sure the
value is ignored.
* @param state The client state, used to skip the check if the
query is internal or is done by a superuser.
- * A {@code null} value means that the check should be
done regardless of the query.
+ * A {@code null} value means that the check should be
done regardless of the query, although it
+ * won't throw any exception if the failure threshold
is exceeded. This is so because checks
+ * without an associated client come from asynchronous
processes such as compaction, and we
+ * don't want to interrupt such processes.
*/
public void guard(Set<T> values, Consumer<T> ignoreAction, @Nullable
ClientState state)
{
@@ -104,7 +107,7 @@ public class Values<T> extends Guardrail
Set<T> toDisallow = Sets.intersection(values, disallowed);
if (!toDisallow.isEmpty())
fail(format("Provided values %s are not allowed for %s (disallowed
values are: %s)",
-
toDisallow.stream().sorted().collect(Collectors.toList()), what, disallowed));
+
toDisallow.stream().sorted().collect(Collectors.toList()), what, disallowed),
state);
Set<T> ignored = ignoredValues.apply(state);
Set<T> toIgnore = Sets.intersection(values, ignored);
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 1d9603e..0ff3e46 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -25,10 +25,16 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionPurger;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.FSWriteError;
@@ -40,10 +46,13 @@ import
org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Transactional;
/**
@@ -386,4 +395,43 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
Collection<SSTableFlushObserver>
observers,
LifecycleNewTracker
lifecycleNewTracker);
}
+
+ public static void guardCollectionSize(TableMetadata metadata,
DecoratedKey partitionKey, Unfiltered unfiltered)
+ {
+ if (!Guardrails.collectionSize.enabled(null) &&
!Guardrails.itemsPerCollection.enabled(null))
+ return;
+
+ if (!unfiltered.isRow() ||
SchemaConstants.isSystemKeyspace(metadata.keyspace))
+ return;
+
+ Row row = (Row) unfiltered;
+ for (ColumnMetadata column : row.columns())
+ {
+ if (!column.type.isCollection() || !column.type.isMultiCell())
+ continue;
+
+ ComplexColumnData cells = row.getComplexColumnData(column);
+ if (cells == null)
+ continue;
+
+ ComplexColumnData liveCells =
cells.purge(DeletionPurger.PURGE_ALL, FBUtilities.nowInSeconds());
+ if (liveCells == null)
+ continue;
+
+ int cellsSize = liveCells.dataSize();
+ int cellsCount = liveCells.cellsCount();
+
+ if (!Guardrails.collectionSize.triggersOn(cellsSize, null) &&
+ !Guardrails.itemsPerCollection.triggersOn(cellsCount, null))
+ continue;
+
+ String keyString =
metadata.primaryKeyAsCQLLiteral(partitionKey.getKey(), row.clustering());
+ String msg = String.format("%s in row %s in table %s",
+ column.name.toString(),
+ keyString,
+ metadata);
+ Guardrails.collectionSize.guard(cellsSize, msg, null);
+ Guardrails.itemsPerCollection.guard(cellsCount, msg, null);
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java
b/src/java/org/apache/cassandra/schema/TableMetadata.java
index ef43c5d..5a226ec 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.AbstractIterator;
import org.github.jamm.Unmetered;
@@ -1342,6 +1343,66 @@ public class TableMetadata implements SchemaElement
}
}
+ /**
+ * Returns a string representation of a partition in a CQL-friendly format.
+ *
+ * For non-composite types it returns the result of {@link
org.apache.cassandra.cql3.CQL3Type#toCQLLiteral}
+ * applied to the partition key.
+ * For composite types it applies {@link
org.apache.cassandra.cql3.CQL3Type#toCQLLiteral} to each subkey and
+ * combines the results into a tuple.
+ *
+ * @param partitionKey a partition key
+ * @return CQL-like string representation of a partition key
+ */
+ public String partitionKeyAsCQLLiteral(ByteBuffer partitionKey)
+ {
+ return primaryKeyAsCQLLiteral(partitionKey, Clustering.EMPTY);
+ }
+
+ /**
+ * Returns a string representation of a primary key in a CQL-friendly
format.
+ *
+ * @param partitionKey the partition key part of the primary key
+ * @param clustering the clustering key part of the primary key
+ * @return a CQL-like string representation of the specified primary key
+ */
+ public String primaryKeyAsCQLLiteral(ByteBuffer partitionKey,
Clustering<?> clustering)
+ {
+ int clusteringSize = clustering.size();
+
+ String[] literals;
+ int i = 0;
+
+ if (partitionKeyType instanceof CompositeType)
+ {
+ List<AbstractType<?>> components =
partitionKeyType.getComponents();
+ int size = components.size();
+ literals = new String[size + clusteringSize];
+ ByteBuffer[] values = ((CompositeType)
partitionKeyType).split(partitionKey);
+ for (i = 0; i < size; i++)
+ {
+ literals[i] = asCQLLiteral(components.get(i), values[i]);
+ }
+ }
+ else
+ {
+ literals = new String[1 + clusteringSize];
+ literals[i++] = asCQLLiteral(partitionKeyType, partitionKey);
+ }
+
+ for (int j = 0; j < clusteringSize; j++)
+ {
+ literals[i++] = asCQLLiteral(clusteringColumns().get(j).type,
clustering.bufferAt(j));
+ }
+
+ return i == 1 ? literals[0] : "(" + String.join(", ", literals) + ")";
+ }
+
+ private static String asCQLLiteral(AbstractType<?> type, ByteBuffer value)
+ {
+ return type.asCQL3Type().toCQLLiteral(value, ProtocolVersion.CURRENT);
+ }
+
public static class CompactTableMetadata extends TableMetadata
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailCollectionSizeOnSSTableWriteTest.java
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailCollectionSizeOnSSTableWriteTest.java
new file mode 100644
index 0000000..e469bc3
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailCollectionSizeOnSSTableWriteTest.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+
+import static java.nio.ByteBuffer.allocate;
+
+/**
+ * Tests the guardrail for the size of collections, {@link
Guardrails#collectionSize}.
+ * <p>
+ * This test only includes the activation of the guardrail during sstable
writes, all other cases are covered by
+ * {@link org.apache.cassandra.db.guardrails.GuardrailCollectionSizeTest}.
+ */
+public class GuardrailCollectionSizeOnSSTableWriteTest extends GuardrailTester
+{
+ private static final int NUM_NODES = 2;
+
+ private static final int WARN_THRESHOLD = 1024;
+ private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4;
+
+ private static Cluster cluster;
+ private static com.datastax.driver.core.Cluster driverCluster;
+ private static Session driverSession;
+
+ @BeforeClass
+ public static void setupCluster() throws IOException
+ {
+ cluster = init(Cluster.build(NUM_NODES)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NATIVE_PROTOCOL)
+ .set("guardrails_enabled",
true)
+
.set("collection_size_warn_threshold", WARN_THRESHOLD + "B")
+
.set("collection_size_fail_threshold", FAIL_THRESHOLD + "B"))
+ .start());
+ cluster.disableAutoCompaction(KEYSPACE);
+ driverCluster =
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ driverSession = driverCluster.connect();
+ }
+
+ @AfterClass
+ public static void teardownCluster()
+ {
+ if (driverSession != null)
+ driverSession.close();
+
+ if (driverCluster != null)
+ driverCluster.close();
+
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @Override
+ protected Cluster getCluster()
+ {
+ return cluster;
+ }
+
+ @Test
+ public void testSetSize() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)", set());
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
+ execute("INSERT INTO %s (k, v) VALUES (3, ?)",
set(allocate(WARN_THRESHOLD / 2)));
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)",
set(allocate(WARN_THRESHOLD)));
+ assertWarnedOnFlush(warnMessage("4"));
+
+ execute("INSERT INTO %s (k, v) VALUES (5, ?)",
set(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD * 3 / 4)));
+ assertWarnedOnFlush(warnMessage("5"));
+
+ execute("INSERT INTO %s (k, v) VALUES (6, ?)",
set(allocate(FAIL_THRESHOLD)));
+ assertFailedOnFlush(failMessage("6"));
+
+ execute("INSERT INTO %s (k, v) VALUES (7, ?)",
set(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD * 3 / 4)));
+ assertFailedOnFlush(failMessage("7"));
+ }
+
+ @Test
+ public void testSetSizeFrozen()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v
frozen<set<blob>>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)", set());
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)",
set(allocate(WARN_THRESHOLD)));
+ execute("INSERT INTO %s (k, v) VALUES (5, ?)",
set(allocate(FAIL_THRESHOLD)));
+
+ // frozen collections size is not checked during sstable write
+ assertNotWarnedOnFlush();
+ }
+
+ @Test
+ public void testSetSizeWithUpdates()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, ?)", set(allocate(1)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 0", set(allocate(1)));
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)",
set(allocate(WARN_THRESHOLD / 4)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 1",
set(allocate(WARN_THRESHOLD * 3 / 4)));
+ assertWarnedOnFlush(warnMessage("1"));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)",
set(allocate(FAIL_THRESHOLD / 4)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 2",
set(allocate(FAIL_THRESHOLD * 3 / 4)));
+ assertFailedOnFlush(failMessage("2"));
+
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)",
set(allocate(FAIL_THRESHOLD)));
+ execute("UPDATE %s SET v = v - ? WHERE k = 4",
set(allocate(FAIL_THRESHOLD)));
+ assertNotWarnedOnFlush();
+ }
+
+ @Test
+ public void testSetSizeAfterCompaction() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, ?)", set(allocate(1)));
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + ? WHERE k = 0", set(allocate(1)));
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)",
set(allocate(WARN_THRESHOLD * 3 / 4)));
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + ? WHERE k = 1",
set(allocate(WARN_THRESHOLD / 4)));
+ assertNotWarnedOnFlush();
+ assertWarnedOnCompact(warnMessage("1"));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)",
set(allocate(FAIL_THRESHOLD * 3 / 4)));
+ assertWarnedOnFlush(warnMessage("2"));
+ execute("UPDATE %s SET v = v + ? WHERE k = 2",
set(allocate(FAIL_THRESHOLD / 4)));
+ assertWarnedOnFlush(warnMessage("2"));
+ assertFailedOnCompact(failMessage("2"));
+
+ execute("INSERT INTO %s (k, v) VALUES (3, ?)",
set(allocate(FAIL_THRESHOLD)));
+ assertFailedOnFlush(failMessage("3"));
+ execute("UPDATE %s SET v = v - ? WHERE k = 3",
set(allocate(FAIL_THRESHOLD)));
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+ }
+
+ @Test
+ public void testListSize() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)", list());
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
+ execute("INSERT INTO %s (k, v) VALUES (3, ?)",
list(allocate(WARN_THRESHOLD / 2)));
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)",
list(allocate(WARN_THRESHOLD)));
+ assertWarnedOnFlush(warnMessage("4"));
+
+ execute("INSERT INTO %s (k, v) VALUES (5, ?)",
list(allocate(WARN_THRESHOLD / 2), allocate(WARN_THRESHOLD / 2)));
+ assertWarnedOnFlush(warnMessage("5"));
+
+ execute("INSERT INTO %s (k, v) VALUES (6, ?)",
list(allocate(FAIL_THRESHOLD)));
+ assertFailedOnFlush(failMessage("6"));
+
+ execute("INSERT INTO %s (k, v) VALUES (7, ?)",
list(allocate(FAIL_THRESHOLD / 2), allocate(FAIL_THRESHOLD / 2)));
+ assertFailedOnFlush(failMessage("7"));
+ }
+
+ @Test
+ public void testListSizeFrozen()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v
frozen<list<blob>>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)", list());
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)",
list(allocate(WARN_THRESHOLD)));
+ execute("INSERT INTO %s (k, v) VALUES (5, ?)",
list(allocate(FAIL_THRESHOLD)));
+
+ // frozen collections size is not checked during sstable write
+ assertNotWarnedOnFlush();
+ }
+
+ @Test
+ public void testListSizeWithUpdates()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, ?)", list(allocate(1)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 0", list(allocate(1)));
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)",
list(allocate(WARN_THRESHOLD / 2)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 1",
list(allocate(WARN_THRESHOLD / 2)));
+ assertWarnedOnFlush(warnMessage("1"));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)",
list(allocate(FAIL_THRESHOLD / 2)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 2",
list(allocate(FAIL_THRESHOLD / 2)));
+ assertFailedOnFlush(failMessage("2"));
+
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)",
list(allocate(FAIL_THRESHOLD)));
+ execute("UPDATE %s SET v = v - ? WHERE k = 4",
list(allocate(FAIL_THRESHOLD)));
+ assertNotWarnedOnFlush();
+ }
+
+ @Test
+ public void testListSizeAfterCompaction() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, ?)", list(allocate(1)));
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + ? WHERE k = 0", list(allocate(1)));
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)",
list(allocate(WARN_THRESHOLD / 2)));
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + ? WHERE k = 1",
list(allocate(WARN_THRESHOLD / 2)));
+ assertNotWarnedOnFlush();
+ assertWarnedOnCompact(warnMessage("1"));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)",
list(allocate(FAIL_THRESHOLD / 2)));
+ assertWarnedOnFlush(warnMessage("2"));
+ execute("UPDATE %s SET v = v + ? WHERE k = 2",
list(allocate(FAIL_THRESHOLD / 2)));
+ assertNotWarnedOnFlush();
+ assertFailedOnCompact(failMessage("2"));
+
+ execute("INSERT INTO %s (k, v) VALUES (3, ?)",
list(allocate(FAIL_THRESHOLD)));
+ assertFailedOnFlush(failMessage("3"));
+ execute("UPDATE %s SET v[0] = ? WHERE k = 3", allocate(1));
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+ }
+
+ @Test
+ public void testMapSize() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)", map());
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1),
allocate(1)));
+ execute("INSERT INTO %s (k, v) VALUES (3, ?)", map(allocate(1),
allocate(WARN_THRESHOLD / 2)));
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)",
map(allocate(WARN_THRESHOLD / 2), allocate(1)));
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (5, ?)",
map(allocate(WARN_THRESHOLD), allocate(1)));
+ assertWarnedOnFlush(warnMessage("5"));
+
+ execute("INSERT INTO %s (k, v) VALUES (6, ?)", map(allocate(1),
allocate(WARN_THRESHOLD)));
+ assertWarnedOnFlush(warnMessage("6"));
+
+ execute("INSERT INTO %s (k, v) VALUES (7, ?)",
map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
+ assertWarnedOnFlush(warnMessage("7"));
+
+ execute("INSERT INTO %s (k, v) VALUES (8, ?)",
map(allocate(FAIL_THRESHOLD), allocate(1)));
+ assertFailedOnFlush(failMessage("8"));
+
+ execute("INSERT INTO %s (k, v) VALUES (9, ?)", map(allocate(1),
allocate(FAIL_THRESHOLD)));
+ assertFailedOnFlush(failMessage("9"));
+
+ execute("INSERT INTO %s (k, v) VALUES (10, ?)",
map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
+ assertFailedOnFlush(failMessage("10"));
+ }
+
+ @Test
+ public void testMapSizeFrozen()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<blob,
blob>>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)", map());
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1),
allocate(1)));
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1),
allocate(WARN_THRESHOLD)));
+ execute("INSERT INTO %s (k, v) VALUES (5, ?)",
map(allocate(WARN_THRESHOLD), allocate(1)));
+ execute("INSERT INTO %s (k, v) VALUES (6, ?)",
map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
+ execute("INSERT INTO %s (k, v) VALUES (7, ?)", map(allocate(1),
allocate(FAIL_THRESHOLD)));
+ execute("INSERT INTO %s (k, v) VALUES (8, ?)",
map(allocate(FAIL_THRESHOLD), allocate(1)));
+ execute("INSERT INTO %s (k, v) VALUES (9, ?)",
map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
+
+ // frozen collections size is not checked during sstable write
+ assertNotWarnedOnFlush();
+ }
+
+ @Test
+ public void testMapSizeWithUpdates()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, ?)", map(allocate(1),
allocate(1)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 0", map(allocate(1),
allocate(1)));
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)", map(allocate(1),
allocate(WARN_THRESHOLD / 2)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 1", map(allocate(2),
allocate(WARN_THRESHOLD / 2)));
+ assertWarnedOnFlush(warnMessage("1"));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)",
map(allocate(WARN_THRESHOLD / 4), allocate(1)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 2",
map(allocate(WARN_THRESHOLD * 3 / 4), allocate(1)));
+ assertWarnedOnFlush(warnMessage("2"));
+
+ execute("INSERT INTO %s (k, v) VALUES (3, ?)",
map(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD / 4)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 3",
map(allocate(WARN_THRESHOLD / 4 + 1), allocate(WARN_THRESHOLD / 4)));
+ assertWarnedOnFlush(warnMessage("3"));
+
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1),
allocate(FAIL_THRESHOLD / 2)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 4", map(allocate(2),
allocate(FAIL_THRESHOLD / 2)));
+ assertFailedOnFlush(failMessage("4"));
+
+ execute("INSERT INTO %s (k, v) VALUES (5, ?)",
map(allocate(FAIL_THRESHOLD / 4), allocate(1)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 5",
map(allocate(FAIL_THRESHOLD * 3 / 4), allocate(1)));
+ assertFailedOnFlush(failMessage("5"));
+
+ execute("INSERT INTO %s (k, v) VALUES (6, ?)",
map(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD / 4)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 6",
map(allocate(FAIL_THRESHOLD / 4 + 1), allocate(FAIL_THRESHOLD / 4)));
+ assertFailedOnFlush(failMessage("6"));
+ }
+
+ @Test
+ public void testMapSizeAfterCompaction()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, ?)", map(allocate(1),
allocate(1)));
+ execute("UPDATE %s SET v = v + ? WHERE k = 0", map(allocate(1),
allocate(1)));
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, ?)", map(allocate(1),
allocate(WARN_THRESHOLD / 2)));
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + ? WHERE k = 1", map(allocate(2),
allocate(WARN_THRESHOLD / 2)));
+ assertNotWarnedOnFlush();
+ assertWarnedOnCompact(warnMessage("1"));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, ?)",
map(allocate(WARN_THRESHOLD / 4), allocate(1)));
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + ? WHERE k = 2",
map(allocate(WARN_THRESHOLD * 3 / 4), allocate(1)));
+ assertNotWarnedOnFlush();
+ assertWarnedOnCompact(warnMessage("2"));
+
+ execute("INSERT INTO %s (k, v) VALUES (3, ?)",
map(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD / 4)));
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + ? WHERE k = 3",
map(allocate(WARN_THRESHOLD / 4 + 1), allocate(WARN_THRESHOLD / 4)));
+ assertNotWarnedOnFlush();
+ assertWarnedOnCompact(warnMessage("3"));
+
+ execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1),
allocate(FAIL_THRESHOLD / 2)));
+ assertWarnedOnFlush(failMessage("4"));
+ execute("UPDATE %s SET v = v + ? WHERE k = 4", map(allocate(2),
allocate(FAIL_THRESHOLD / 2)));
+ assertWarnedOnFlush(warnMessage("4"));
+ assertFailedOnCompact(failMessage("4"));
+
+ execute("INSERT INTO %s (k, v) VALUES (5, ?)",
map(allocate(FAIL_THRESHOLD / 4), allocate(1)));
+ assertWarnedOnFlush(failMessage("5"));
+ execute("UPDATE %s SET v = v + ? WHERE k = 5",
map(allocate(FAIL_THRESHOLD * 3 / 4), allocate(1)));
+ assertWarnedOnFlush(warnMessage("5"));
+ assertFailedOnCompact(failMessage("5"));
+
+ execute("INSERT INTO %s (k, v) VALUES (6, ?)",
map(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD / 4)));
+ assertWarnedOnFlush(failMessage("6"));
+ execute("UPDATE %s SET v = v + ? WHERE k = 6",
map(allocate(FAIL_THRESHOLD / 4 + 1), allocate(FAIL_THRESHOLD / 4)));
+ assertWarnedOnFlush(warnMessage("6"));
+ assertFailedOnCompact(failMessage("6"));
+ }
+
+ @Test
+ public void testCompositePartitionKey()
+ {
+ schemaChange("CREATE TABLE %s (k1 int, k2 text, v set<blob>, PRIMARY
KEY((k1, k2)))");
+
+ execute("INSERT INTO %s (k1, k2, v) VALUES (0, 'a', ?)",
set(allocate(WARN_THRESHOLD)));
+ assertWarnedOnFlush(warnMessage("(0, 'a')"));
+
+ execute("INSERT INTO %s (k1, k2, v) VALUES (1, 'b', ?)",
set(allocate(FAIL_THRESHOLD)));
+ assertFailedOnFlush(failMessage("(1, 'b')"));
+ }
+
+ @Test
+ public void testCompositeClusteringKey()
+ {
+ schemaChange("CREATE TABLE %s (k int, c1 int, c2 text, v set<blob>,
PRIMARY KEY(k, c1, c2))");
+
+ execute("INSERT INTO %s (k, c1, c2, v) VALUES (1, 10, 'a', ?)",
set(allocate(WARN_THRESHOLD)));
+ assertWarnedOnFlush(warnMessage("(1, 10, 'a')"));
+
+ execute("INSERT INTO %s (k, c1, c2, v) VALUES (2, 20, 'b', ?)",
set(allocate(FAIL_THRESHOLD)));
+ assertFailedOnFlush(failMessage("(2, 20, 'b')"));
+ }
+
+ private void execute(String query, Object... args)
+ {
+ SimpleStatement stmt = new SimpleStatement(format(query), args);
+
stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL);
+ driverSession.execute(stmt);
+ }
+
+ private String warnMessage(String key)
+ {
+ return String.format("Detected collection v in row %s in table %s of
size", key, qualifiedTableName);
+ }
+
+ private String failMessage(String key)
+ {
+ return String.format("Detected collection v in row %s in table %s of
size", key, qualifiedTableName);
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailItemsPerCollectionOnSSTableWriteTest.java
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailItemsPerCollectionOnSSTableWriteTest.java
new file mode 100644
index 0000000..742fa62
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailItemsPerCollectionOnSSTableWriteTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+/**
+ * Tests the guardrail for the number of items on a collection, {@link
Guardrails#itemsPerCollection}.
+ * <p>
+ * This test only includes the activation of the guardrail during sstable
writes, all other cases are covered by
+ * {@link org.apache.cassandra.db.guardrails.GuardrailItemsPerCollectionTest}.
+ */
+public class GuardrailItemsPerCollectionOnSSTableWriteTest extends
GuardrailTester
+{
+ private static final int NUM_NODES = 2;
+
+ private static final int WARN_THRESHOLD = 2;
+ private static final int FAIL_THRESHOLD = 4;
+
+ private static Cluster cluster;
+ private static ICoordinator coordinator;
+
+ @BeforeClass
+ public static void setupCluster() throws IOException
+ {
+ cluster = init(Cluster.build(NUM_NODES)
+ .withConfig(c -> c.set("guardrails_enabled",
true)
+
.set("items_per_collection_warn_threshold", WARN_THRESHOLD)
+
.set("items_per_collection_fail_threshold", FAIL_THRESHOLD))
+ .start());
+ cluster.disableAutoCompaction(KEYSPACE);
+ coordinator = cluster.coordinator(1);
+ }
+
+ @AfterClass
+ public static void teardownCluster()
+ {
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @Override
+ protected Cluster getCluster()
+ {
+ return cluster;
+ }
+
+ @Test
+ public void testSetSize() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, {1})");
+ execute("INSERT INTO %s (k, v) VALUES (2, {1, 2})");
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (3, {1, 2, 3})");
+ execute("INSERT INTO %s (k, v) VALUES (4, {1, 2, 3, 4})");
+ assertWarnedOnFlush(warnMessage("3", 3), warnMessage("4", 4));
+
+ execute("INSERT INTO %s (k, v) VALUES (5, {1, 2, 3, 4, 5})");
+ execute("INSERT INTO %s (k, v) VALUES (6, {1, 2, 3, 4, 5, 6})");
+ assertFailedOnFlush(failMessage("5", 5), failMessage("6", 6));
+ }
+
+ @Test
+ public void testSetSizeFrozen()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v
frozen<set<int>>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (3, {1, 2, 3})");
+ execute("INSERT INTO %s (k, v) VALUES (5, {1, 2, 3, 4, 5})");
+
+ // the size of frozen collections is not checked during sstable write
+ assertNotWarnedOnFlush();
+ }
+
+ @Test
+ public void testSetSizeWithUpdates()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+ execute("UPDATE %s SET v = v + {1, 2} WHERE k = 1");
+ execute("UPDATE %s SET v = v - {1, 2} WHERE k = 2");
+ assertNotWarnedOnFlush();
+
+ execute("UPDATE %s SET v = v + {1, 2, 3} WHERE k = 3");
+ execute("UPDATE %s SET v = v - {1, 2, 3} WHERE k = 4");
+ assertWarnedOnFlush(warnMessage("3", 3));
+
+ execute("UPDATE %s SET v = v + {1, 2, 3, 4, 5} WHERE k = 5");
+ execute("UPDATE %s SET v = v - {1, 2, 3, 4, 5} WHERE k = 6");
+ assertFailedOnFlush(failMessage("5", 5));
+ }
+
+ @Test
+ public void testSetSizeAfterCompaction() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, {1})");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + {2} WHERE k = 0");
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, {1})");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + {2, 3} WHERE k = 1");
+ assertNotWarnedOnFlush();
+ assertWarnedOnCompact(warnMessage("1", 3));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, {1, 2})");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + {3, 4, 5} WHERE k = 2");
+ assertWarnedOnFlush(warnMessage("2", 3));
+ assertFailedOnCompact(failMessage("2", 5));
+ }
+
+ @Test
+ public void testListSize() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, [1])");
+ execute("INSERT INTO %s (k, v) VALUES (2, [1, 2])");
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (3, [1, 2, 3])");
+ execute("INSERT INTO %s (k, v) VALUES (4, [1, 2, 3, 4])");
+ assertWarnedOnFlush(warnMessage("3", 3), warnMessage("4", 4));
+
+ execute("INSERT INTO %s (k, v) VALUES (5, [1, 2, 3, 4, 5])");
+ execute("INSERT INTO %s (k, v) VALUES (6, [1, 2, 3, 4, 5, 6])");
+ assertFailedOnFlush(failMessage("5", 5), failMessage("6", 6));
+ }
+
+ @Test
+ public void testListSizeFrozen() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v
frozen<list<int>>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (3, [1, 2, 3])");
+ execute("INSERT INTO %s (k, v) VALUES (5, [1, 2, 3, 4, 5])");
+
+ // the size of frozen collections is not checked during sstable write
+ assertNotWarnedOnFlush();
+ }
+
+ @Test
+ public void testListSizeWithUpdates()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+ execute("UPDATE %s SET v = v + [1, 2] WHERE k = 1");
+ execute("UPDATE %s SET v = v - [1, 2] WHERE k = 2");
+ assertNotWarnedOnFlush();
+
+ execute("UPDATE %s SET v = v + [1, 2, 3] WHERE k = 3");
+ execute("UPDATE %s SET v = v - [1, 2, 3] WHERE k = 4");
+ assertWarnedOnFlush(warnMessage("3", 3));
+
+ execute("UPDATE %s SET v = v + [1, 2, 3, 4, 5] WHERE k = 5");
+ execute("UPDATE %s SET v = v - [1, 2, 3, 4, 5] WHERE k = 6");
+ assertFailedOnFlush(failMessage("5", 5));
+ }
+
+ @Test
+ public void testListSizeAfterCompaction() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, [1])");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + [2] WHERE k = 0");
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, [1])");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + [2, 3] WHERE k = 1");
+ assertNotWarnedOnFlush();
+ assertWarnedOnCompact(warnMessage("1", 3));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, [1, 2])");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + [3, 4, 5] WHERE k = 2");
+ assertWarnedOnFlush(warnMessage("2", 3));
+ assertFailedOnCompact(failMessage("2", 5));
+
+ execute("INSERT INTO %s (k, v) VALUES (3, [1, 2, 3])");
+ assertWarnedOnFlush(warnMessage("3", 3));
+ execute("UPDATE %s SET v[1] = null WHERE k = 3");
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+ }
+
+ @Test
+ public void testMapSize() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, null)");
+ execute("INSERT INTO %s (k, v) VALUES (1, {1:10})");
+ execute("INSERT INTO %s (k, v) VALUES (2, {1:10, 2:20})");
+ assertNotWarnedOnFlush();
+
+ execute("INSERT INTO %s (k, v) VALUES (3, {1:10, 2:20, 3:30})");
+ execute("INSERT INTO %s (k, v) VALUES (4, {1:10, 2:20, 3:30, 4:40})");
+ assertWarnedOnFlush(warnMessage("3", 3), warnMessage("4", 4));
+
+ execute("INSERT INTO %s (k, v) VALUES (5, {1:10, 2:20, 3:30, 4:40,
5:50})");
+ execute("INSERT INTO %s (k, v) VALUES (6, {1:10, 2:20, 3:30, 4:40,
5:50, 6:60})");
+ assertFailedOnFlush(failMessage("5", 5), failMessage("6", 6));
+ }
+
+ @Test
+ public void testMapSizeFrozen()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<int,
int>>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (3, {1:10, 2:20, 3:30})");
+ execute("INSERT INTO %s (k, v) VALUES (4, {1:10, 2:20, 3:30, 4:40})");
+
+ // the size of frozen collections is not checked during sstable write
+ assertNotWarnedOnFlush();
+ }
+
+ @Test
+ public void testMapSizeWithUpdates()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+ execute("UPDATE %s SET v = v + {1:10, 2:20} WHERE k = 1");
+ execute("UPDATE %s SET v = v - {1, 2} WHERE k = 2");
+ assertNotWarnedOnFlush();
+
+ execute("UPDATE %s SET v = v + {1:10, 2:20, 3:30} WHERE k = 3");
+ execute("UPDATE %s SET v = v - {1, 2, 3} WHERE k = 4");
+ assertWarnedOnFlush(warnMessage("3", 3));
+
+ execute("UPDATE %s SET v = v + {1:10, 2:20, 3:30, 4:40, 5:50} WHERE k
= 5");
+ execute("UPDATE %s SET v = v - {1, 2, 3, 4, 5} WHERE k = 6");
+ assertFailedOnFlush(failMessage("5", 5));
+ }
+
+ @Test
+ public void testMapSizeAfterCompaction()
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+ execute("INSERT INTO %s (k, v) VALUES (0, {1:10})");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + {2:20} WHERE k = 0");
+ assertNotWarnedOnFlush();
+ assertNotWarnedOnCompact();
+
+ execute("INSERT INTO %s (k, v) VALUES (1, {1:10})");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + {2:20, 3:30} WHERE k = 1");
+ assertNotWarnedOnFlush();
+ assertWarnedOnCompact(warnMessage("1", 3));
+
+ execute("INSERT INTO %s (k, v) VALUES (2, {1:10, 2:20})");
+ assertNotWarnedOnFlush();
+ execute("UPDATE %s SET v = v + {3:30, 4:40, 5:50} WHERE k = 2");
+ assertWarnedOnFlush(warnMessage("2", 3));
+ assertFailedOnCompact(failMessage("2", 5));
+ }
+
+ @Test
+ public void testCompositePartitionKey()
+ {
+ schemaChange("CREATE TABLE %s (k1 int, k2 text, v set<int>, PRIMARY
KEY((k1, k2)))");
+
+ execute("INSERT INTO %s (k1, k2, v) VALUES (0, 'a', {1, 2, 3})");
+ assertWarnedOnFlush(warnMessage("(0, 'a')", 3));
+
+ execute("INSERT INTO %s (k1, k2, v) VALUES (1, 'b', {1, 2, 3, 4, 5})");
+ assertFailedOnFlush(failMessage("(1, 'b')", 5));
+ }
+
+ @Test
+ public void testCompositeClusteringKey()
+ {
+ schemaChange("CREATE TABLE %s (k int, c1 int, c2 text, v set<int>,
PRIMARY KEY(k, c1, c2))");
+
+ execute("INSERT INTO %s (k, c1, c2, v) VALUES (1, 10, 'a', {1, 2,
3})");
+ assertWarnedOnFlush(warnMessage("(1, 10, 'a')", 3));
+
+ execute("INSERT INTO %s (k, c1, c2, v) VALUES (2, 20, 'b', {1, 2, 3,
4, 5})");
+ assertFailedOnFlush(failMessage("(2, 20, 'b')", 5));
+ }
+
+ private void execute(String query)
+ {
+ coordinator.execute(format(query), ConsistencyLevel.ALL);
+ }
+
+ private String warnMessage(String key, int numItems)
+ {
+ return String.format("Detected collection v in row %s in table %s with
%d items, " +
+ "this exceeds the warning threshold of %d.",
+ key, qualifiedTableName, numItems,
WARN_THRESHOLD);
+ }
+
+ private String failMessage(String key, int numItems)
+ {
+ return String.format("Detected collection v in row %s in table %s with
%d items, " +
+ "this exceeds the failure threshold of %d.",
+ key, qualifiedTableName, numItems,
FAIL_THRESHOLD);
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
new file mode 100644
index 0000000..b12d9ab
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ListAssert;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public abstract class GuardrailTester extends TestBaseImpl
+{
+ private static final AtomicInteger seqNumber = new AtomicInteger();
+ protected String tableName, qualifiedTableName;
+
+ protected abstract Cluster getCluster();
+
+ @Before
+ public void beforeTest()
+ {
+ tableName = "t_" + seqNumber.getAndIncrement();
+ qualifiedTableName = KEYSPACE + "." + tableName;
+ }
+
+ @After
+ public void afterTest()
+ {
+ schemaChange("DROP TABLE IF EXISTS %s");
+ }
+
+ protected String format(String query)
+ {
+ return String.format(query, qualifiedTableName);
+ }
+
+ protected void schemaChange(String query)
+ {
+ getCluster().schemaChange(format(query));
+ }
+
+ protected void assertNotWarnedOnFlush()
+ {
+ assertNotWarnsOnSSTableWrite(false);
+ }
+
+ protected void assertNotWarnedOnCompact()
+ {
+ assertNotWarnsOnSSTableWrite(true);
+ }
+
+ protected void assertNotWarnsOnSSTableWrite(boolean compact)
+ {
+ getCluster().stream().forEach(node ->
assertNotWarnsOnSSTableWrite(node, compact));
+ }
+
+ protected void assertNotWarnsOnSSTableWrite(IInstance node, boolean
compact)
+ {
+ long mark = node.logs().mark();
+ try
+ {
+ writeSSTables(node, compact);
+ assertTrue(node.logs().grep(mark, "^ERROR",
"^WARN").getResult().isEmpty());
+ }
+ catch (InvalidRequestException e)
+ {
+ fail("Expected not to fail, but Fails with error message: " +
e.getMessage());
+ }
+ }
+
+ protected void assertWarnedOnFlush(String... msgs)
+ {
+ assertWarnsOnSSTableWrite(false, msgs);
+ }
+
+ protected void assertWarnedOnCompact(String... msgs)
+ {
+ assertWarnsOnSSTableWrite(true, msgs);
+ }
+
+ protected void assertWarnsOnSSTableWrite(boolean compact, String... msgs)
+ {
+ getCluster().stream().forEach(node -> assertWarnsOnSSTableWrite(node,
compact, msgs));
+ }
+
+ protected void assertWarnsOnSSTableWrite(IInstance node, boolean compact,
String... msgs)
+ {
+ long mark = node.logs().mark();
+ writeSSTables(node, compact);
+ assertTrue(node.logs().grep(mark, "^ERROR").getResult().isEmpty());
+ List<String> warnings = node.logs().grep(mark, "^WARN").getResult();
+ ListAssert<String> assertion =
Assertions.assertThat(warnings).isNotEmpty().hasSize(msgs.length);
+ for (String msg : msgs)
+ assertion.anyMatch(m -> m.contains(msg));
+ }
+
+ protected void assertFailedOnFlush(String... msgs)
+ {
+ assertFailsOnSSTableWrite(false, msgs);
+ }
+
+ protected void assertFailedOnCompact(String... msgs)
+ {
+ assertFailsOnSSTableWrite(true, msgs);
+ }
+
+ private void assertFailsOnSSTableWrite(boolean compact, String... msgs)
+ {
+ getCluster().stream().forEach(node -> assertFailsOnSSTableWrite(node,
compact, msgs));
+ }
+
+ private void assertFailsOnSSTableWrite(IInstance node, boolean compact,
String... msgs)
+ {
+ long mark = node.logs().mark();
+ writeSSTables(node, compact);
+ assertTrue(node.logs().grep(mark, "^WARN").getResult().isEmpty());
+ List<String> warnings = node.logs().grep(mark, "^ERROR").getResult();
+ ListAssert<String> assertion =
Assertions.assertThat(warnings).isNotEmpty().hasSize(msgs.length);
+ for (String msg : msgs)
+ assertion.anyMatch(m -> m.contains(msg));
+ }
+
+ private void writeSSTables(IInstance node, boolean compact)
+ {
+ node.flush(KEYSPACE);
+ if (compact)
+ node.forceCompact(KEYSPACE, tableName);
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
new file mode 100644
index 0000000..f867783
--- /dev/null
+++
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+
+import static java.nio.ByteBuffer.allocate;
+
+/**
+ * Tests the guardrail for the size of collections, {@link
Guardrails#collectionSize}.
+ * <p>
+ * This test doesn't include the activation of the guardrail during sstable
writes, these cases are covered by the dtest
+ * {@link
org.apache.cassandra.distributed.test.guardrails.GuardrailCollectionSizeOnSSTableWriteTest}.
+ */
+public class GuardrailCollectionSizeTest extends ThresholdTester
+{
+ private static final int WARN_THRESHOLD = 1024; // bytes
+ private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes
+
+ public GuardrailCollectionSizeTest()
+ {
+ super(WARN_THRESHOLD / 1024, // to KiB
+ FAIL_THRESHOLD / 1024, // to KiB
+ Guardrails.collectionSize,
+ Guardrails::setCollectionSizeThresholdInKiB,
+ Guardrails::getCollectionSizeWarnThresholdInKiB,
+ Guardrails::getCollectionSizeFailThresholdInKiB);
+ }
+
+ @Test
+ public void testSetSize() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", set());
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
+ assertValid("INSERT INTO %s (k, v) VALUES (3, ?)",
set(allocate(WARN_THRESHOLD / 2)));
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)",
set(allocate(WARN_THRESHOLD)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)",
set(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD * 3 / 4)));
+
+ assertFails("INSERT INTO %s (k, v) VALUES (6, ?)",
set(allocate(FAIL_THRESHOLD)));
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)",
set(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD * 3 / 4)));
+ }
+
+ @Test
+ public void testSetSizeFrozen() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v
frozen<set<blob>>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", set());
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)",
set(allocate(WARN_THRESHOLD)));
+ assertFails("INSERT INTO %s (k, v) VALUES (5, ?)",
set(allocate(FAIL_THRESHOLD)));
+ }
+
+ @Test
+ public void testSetSizeWithUpdates() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", set(allocate(1)));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 0", set(allocate(1)));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)",
set(allocate(WARN_THRESHOLD / 4)));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 1",
set(allocate(WARN_THRESHOLD * 3 / 4)));
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (2, ?)",
set(allocate(FAIL_THRESHOLD / 4)));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 2",
set(allocate(FAIL_THRESHOLD * 3 / 4)));
+ }
+
+ @Test
+ public void testListSize() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", list());
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
+ assertValid("INSERT INTO %s (k, v) VALUES (3, ?)",
list(allocate(WARN_THRESHOLD / 2)));
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)",
list(allocate(WARN_THRESHOLD)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)",
list(allocate(WARN_THRESHOLD / 2), allocate(WARN_THRESHOLD / 2)));
+
+ assertFails("INSERT INTO %s (k, v) VALUES (6, ?)",
list(allocate(FAIL_THRESHOLD)));
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)",
list(allocate(FAIL_THRESHOLD / 2), allocate(FAIL_THRESHOLD / 2)));
+ }
+
+ @Test
+ public void testListSizeFrozen() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v
frozen<list<blob>>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", list());
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)",
list(allocate(WARN_THRESHOLD)));
+ assertFails("INSERT INTO %s (k, v) VALUES (5, ?)",
list(allocate(FAIL_THRESHOLD)));
+ }
+
+ @Test
+ public void testListSizeWithUpdates() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", list(allocate(1)));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 0", list(allocate(1)));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)",
list(allocate(WARN_THRESHOLD / 2)));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 1",
list(allocate(WARN_THRESHOLD / 2)));
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (2, ?)",
list(allocate(FAIL_THRESHOLD / 2)));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 2",
list(allocate(FAIL_THRESHOLD / 2)));
+ }
+
+ @Test
+ public void testMapSize() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map());
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1),
allocate(1)));
+ assertValid("INSERT INTO %s (k, v) VALUES (3, ?)", map(allocate(1),
allocate(WARN_THRESHOLD / 2)));
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)",
map(allocate(WARN_THRESHOLD / 2), allocate(1)));
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)",
map(allocate(WARN_THRESHOLD), allocate(1)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", map(allocate(1),
allocate(WARN_THRESHOLD)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (7, ?)",
map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
+
+ assertFails("INSERT INTO %s (k, v) VALUES (8, ?)",
map(allocate(FAIL_THRESHOLD), allocate(1)));
+ assertFails("INSERT INTO %s (k, v) VALUES (9, ?)", map(allocate(1),
allocate(FAIL_THRESHOLD)));
+ assertFails("INSERT INTO %s (k, v) VALUES (10, ?)",
map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
+ }
+
+ @Test
+ public void testMapSizeFrozen() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<blob,
blob>>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map());
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1),
allocate(1)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1),
allocate(WARN_THRESHOLD)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)",
map(allocate(WARN_THRESHOLD), allocate(1)));
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)",
map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", map(allocate(1),
allocate(FAIL_THRESHOLD)));
+ assertFails("INSERT INTO %s (k, v) VALUES (8, ?)",
map(allocate(FAIL_THRESHOLD), allocate(1)));
+ assertFails("INSERT INTO %s (k, v) VALUES (9, ?)",
map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
+ }
+
+ @Test
+ public void testMapSizeWithUpdates() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", map(allocate(1),
allocate(1)));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 0", map(allocate(1),
allocate(1)));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map(allocate(1),
allocate(WARN_THRESHOLD / 2)));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 1", map(allocate(2),
allocate(WARN_THRESHOLD / 2)));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)",
map(allocate(WARN_THRESHOLD / 4), allocate(1)));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 2",
map(allocate(WARN_THRESHOLD * 3 / 4), allocate(1)));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (3, ?)",
map(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD / 4)));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 3",
map(allocate(WARN_THRESHOLD / 4 + 1), allocate(WARN_THRESHOLD / 4)));
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1),
allocate(FAIL_THRESHOLD / 2)));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 4", map(allocate(2),
allocate(FAIL_THRESHOLD / 2)));
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)",
map(allocate(FAIL_THRESHOLD / 4), allocate(1)));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 5",
map(allocate(FAIL_THRESHOLD * 3 / 4), allocate(1)));
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)",
map(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD / 4)));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 6",
map(allocate(FAIL_THRESHOLD / 4 + 1), allocate(FAIL_THRESHOLD / 4)));
+ }
+
+ private void assertValid(String query, ByteBuffer... values) throws
Throwable
+ {
+ assertValid(execute(query, values));
+ }
+
+ private void assertWarns(String query, ByteBuffer... values) throws
Throwable
+ {
+ assertWarns(execute(query, values), "Detected collection v");
+ }
+
+ private void assertFails(String query, ByteBuffer... values) throws
Throwable
+ {
+ assertFails(execute(query, values), "Detected collection v");
+ }
+
+ private CheckedFunction execute(String query, ByteBuffer... values)
+ {
+ return () -> execute(userClientState, query, Arrays.asList(values));
+ }
+
+ private static ByteBuffer set(ByteBuffer... values)
+ {
+ return SetType.getInstance(BytesType.instance,
true).decompose(ImmutableSet.copyOf(values));
+ }
+
+ private static ByteBuffer list(ByteBuffer... values)
+ {
+ return ListType.getInstance(BytesType.instance,
true).decompose(ImmutableList.copyOf(values));
+ }
+
+ private ByteBuffer map()
+ {
+ return MapType.getInstance(BytesType.instance, BytesType.instance,
true).decompose(Collections.emptyMap());
+ }
+
+ private ByteBuffer map(ByteBuffer key, ByteBuffer value)
+ {
+ return MapType.getInstance(BytesType.instance, BytesType.instance,
true).decompose(ImmutableMap.of(key, value));
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailItemsPerCollectionTest.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailItemsPerCollectionTest.java
new file mode 100644
index 0000000..66424aa
--- /dev/null
+++
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailItemsPerCollectionTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+
+/**
+ * Tests the guardrail for the number of items on a collection, {@link
Guardrails#itemsPerCollection}.
+ * <p>
+ * This test doesn't include the activation of the guardrail during sstable
writes, these cases are covered by the dtest
+ * {@link
org.apache.cassandra.distributed.test.guardrails.GuardrailItemsPerCollectionOnSSTableWriteTest}.
+ */
+public class GuardrailItemsPerCollectionTest extends ThresholdTester
+{
+ private static final int WARN_THRESHOLD = 10;
+ private static final int FAIL_THRESHOLD = 20;
+
+ public GuardrailItemsPerCollectionTest()
+ {
+ super(WARN_THRESHOLD,
+ FAIL_THRESHOLD,
+ Guardrails.itemsPerCollection,
+ Guardrails::setItemsPerCollectionThreshold,
+ Guardrails::getItemsPerCollectionWarnThreshold,
+ Guardrails::getItemsPerCollectionFailThreshold);
+ }
+
+ @Test
+ public void testSetSize() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", set(0));
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(1));
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)",
set(WARN_THRESHOLD));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", set(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)",
set(FAIL_THRESHOLD), FAIL_THRESHOLD);
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", set(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ assertFails("INSERT INTO %s (k, v) VALUES (8, ?)", set(FAIL_THRESHOLD
+ 10), FAIL_THRESHOLD + 10);
+ }
+
+ @Test
+ public void testSetSizeFrozen() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v frozen<set<int>>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", set(0));
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(1));
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)",
set(WARN_THRESHOLD));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", set(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)",
set(FAIL_THRESHOLD), FAIL_THRESHOLD);
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", set(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ }
+
+ @Test
+ public void testSetSizeWithUpdates() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", set(1));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 0", set(1,
WARN_THRESHOLD));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 0", set(1,
FAIL_THRESHOLD), FAIL_THRESHOLD - 1);
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (1, ?)", set(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertValid("UPDATE %s SET v = v - ? WHERE k = 1", set(1));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(1));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 2", set(1,
WARN_THRESHOLD + 1));
+
+ assertFails("INSERT INTO %s (k, v) VALUES (3, ?)", set(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ assertValid("UPDATE %s SET v = v - ? WHERE k = 3", set(1));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", set(1));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 4", set(1,
FAIL_THRESHOLD + 1), FAIL_THRESHOLD);
+ }
+
+ @Test
+ public void testListSize() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", list(0));
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(1));
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)",
list(WARN_THRESHOLD));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", list(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)",
list(FAIL_THRESHOLD), FAIL_THRESHOLD);
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", list(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", list(FAIL_THRESHOLD
+ 10), FAIL_THRESHOLD + 10);
+ }
+
+ @Test
+ public void testListSizeFrozen() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v
frozen<list<int>>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", list(0));
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(1));
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)",
list(WARN_THRESHOLD));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", list(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)",
list(FAIL_THRESHOLD), FAIL_THRESHOLD);
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", list(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ }
+
+ @Test
+ public void testListSizeWithUpdates() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", list(1));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 0", list(1,
WARN_THRESHOLD));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 0", list(1,
FAIL_THRESHOLD), FAIL_THRESHOLD - 1);
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (1, ?)", list(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertValid("UPDATE %s SET v = v - ? WHERE k = 1", list(1));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(1));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 2", list(1,
WARN_THRESHOLD + 1));
+
+ assertFails("INSERT INTO %s (k, v) VALUES (3, ?)", list(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ assertValid("UPDATE %s SET v = v - ? WHERE k = 3", list(1));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", list(1));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 4", list(1,
FAIL_THRESHOLD + 1), FAIL_THRESHOLD);
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", set(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertValid("UPDATE %s SET v[0] = null WHERE k = 5");
+ }
+
+ @Test
+ public void testMapSize() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map(0));
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(1));
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)",
map(WARN_THRESHOLD));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", map(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)",
map(FAIL_THRESHOLD), FAIL_THRESHOLD);
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", map(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", map(FAIL_THRESHOLD
+ 10), FAIL_THRESHOLD + 10);
+ }
+
+ @Test
+ public void testMapSizeFrozen() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<int,
int>>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+ assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map(0));
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(1));
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)",
map(WARN_THRESHOLD));
+ assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", map(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)",
map(FAIL_THRESHOLD), FAIL_THRESHOLD);
+ assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", map(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ }
+
+ @Test
+ public void testMapSizeWithUpdates() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+ assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", map(1));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 0", map(1,
WARN_THRESHOLD));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 0", map(1,
FAIL_THRESHOLD), FAIL_THRESHOLD - 1);
+
+ assertWarns("INSERT INTO %s (k, v) VALUES (1, ?)", map(WARN_THRESHOLD
+ 1), WARN_THRESHOLD + 1);
+ assertValid("UPDATE %s SET v = v - ? WHERE k = 1", set(1));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(1));
+ assertValid("UPDATE %s SET v = v + ? WHERE k = 2", map(1,
WARN_THRESHOLD + 1));
+
+ assertFails("INSERT INTO %s (k, v) VALUES (3, ?)", map(FAIL_THRESHOLD
+ 1), FAIL_THRESHOLD + 1);
+ assertValid("UPDATE %s SET v = v - ? WHERE k = 3", set(1));
+
+ assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", map(1));
+ assertWarns("UPDATE %s SET v = v + ? WHERE k = 4", map(1,
FAIL_THRESHOLD + 1), FAIL_THRESHOLD);
+ }
+
+ private void assertValid(String query, ByteBuffer collection) throws
Throwable
+ {
+ assertValid(execute(query, collection));
+ }
+
+ private void assertWarns(String query, ByteBuffer collection, int
numItems) throws Throwable
+ {
+ assertWarns(execute(query, collection),
+ String.format("Detected collection v with %d items, this
exceeds the warning threshold of %d.",
+ numItems, WARN_THRESHOLD));
+ }
+
+ private void assertFails(String query, ByteBuffer collection, int
numItems) throws Throwable
+ {
+ assertFails(execute(query, collection),
+ String.format("Detected collection v with %d items, this
exceeds the failure threshold of %d.",
+ numItems, FAIL_THRESHOLD));
+ }
+
+ private CheckedFunction execute(String query, ByteBuffer collection)
+ {
+ return () -> execute(userClientState, query,
Collections.singletonList(collection));
+ }
+
+ private static ByteBuffer set(int numElements)
+ {
+ return set(0, numElements);
+ }
+
+ private static ByteBuffer set(int startInclusive, int endExclusive)
+ {
+ return SetType.getInstance(Int32Type.instance, true)
+ .decompose(collection(startInclusive, endExclusive,
Collectors.toSet()));
+ }
+
+ private static ByteBuffer list(int numElements)
+ {
+ return list(0, numElements);
+ }
+
+ private static ByteBuffer list(int startInclusive, int endExclusive)
+ {
+ return ListType.getInstance(Int32Type.instance, false)
+ .decompose(collection(startInclusive, endExclusive,
Collectors.toList()));
+ }
+
+ private static ByteBuffer map(int numElements)
+ {
+ return map(0, numElements);
+ }
+
+ private static ByteBuffer map(int startInclusive, int endExclusive)
+ {
+ return MapType.getInstance(Int32Type.instance, Int32Type.instance,
true)
+ .decompose(collection(startInclusive, endExclusive,
Collectors.toMap(x -> x, x -> x)));
+ }
+
+ private static <R, A> R collection(int startInclusive, int endExclusive,
Collector<Integer, A, R> collector)
+ {
+ return IntStream.range(startInclusive,
endExclusive).boxed().collect(collector);
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailPageSizeTest.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailPageSizeTest.java
index ba4c2d0..68122f2 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailPageSizeTest.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailPageSizeTest.java
@@ -144,10 +144,4 @@ public class GuardrailPageSizeTest extends ThresholdTester
statement.executeLocally(queryState, options);
}
-
- //not used by page-size guardrail tests.
- protected long currentValue()
- {
- throw new UnsupportedOperationException();
- }
}
diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
index c108560..70c4cfc 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
@@ -113,7 +113,7 @@ public class GuardrailsTest extends GuardrailTester
state -> 10,
state -> 100,
(isWarn, what, v, t) -> format("%s:
for %s, %s > %s",
- isWarn
? "Warning" : "Aborting", what, v, t));
+ isWarn
? "Warning" : "Failure", what, v, t));
// value under both thresholds
assertValid(() -> guard.guard(5, "x", null));
@@ -127,9 +127,10 @@ public class GuardrailsTest extends GuardrailTester
assertValid(() -> guard.guard(100, "y", systemClientState));
assertValid(() -> guard.guard(100, "y", superClientState));
- // value over fail threshold
- assertFails(() -> guard.guard(101, "z", null), "Aborting: for z, 101 >
100");
- assertFails(() -> guard.guard(101, "z", userClientState), "Aborting:
for z, 101 > 100");
+ // value over fail threshold. An undefined user means that the check
comes from a background process,
+ // so we warn instead of failing to prevent interrupting that process.
+ assertWarns(() -> guard.guard(101, "z", null), "Failure: for z, 101 >
100");
+ assertFails(() -> guard.guard(101, "z", userClientState), "Failure:
for z, 101 > 100");
assertValid(() -> guard.guard(101, "z", systemClientState));
assertValid(() -> guard.guard(101, "z", superClientState));
}
@@ -154,7 +155,6 @@ public class GuardrailsTest extends GuardrailTester
assertValid(() -> enabled.ensureEnabled(superClientState));
DisableFlag disabled = new DisableFlag("x", state -> true, "X");
- assertFails(() -> disabled.ensureEnabled(null), "X is not allowed");
assertFails(() -> disabled.ensureEnabled(userClientState), "X is not
allowed");
assertValid(() -> disabled.ensureEnabled(systemClientState));
assertValid(() -> disabled.ensureEnabled(superClientState));
@@ -234,9 +234,9 @@ public class GuardrailsTest extends GuardrailTester
assertValid(() -> disallowed.guard(set(200), action, userClientState));
assertValid(() -> disallowed.guard(set(1, 2, 3), action,
userClientState));
- assertFails(() -> disallowed.guard(set(4, 6), action, null),
+ assertWarns(() -> disallowed.guard(set(4, 6), action, null),
"Provided values [4, 6] are not allowed for integer
(disallowed values are: [4, 6, 20])");
- assertFails(() -> disallowed.guard(set(4, 5, 6, 7), action, null),
+ assertWarns(() -> disallowed.guard(set(4, 5, 6, 7), action, null),
"Provided values [4, 6] are not allowed for integer
(disallowed values are: [4, 6, 20])");
}
@@ -271,7 +271,7 @@ public class GuardrailsTest extends GuardrailTester
Assert.assertEquals(list(3, 3), triggeredOn);
message = "Provided values [4] are not allowed for integer (disallowed
values are: [4])";
- assertFails(() -> disallowed.guard(set(4), action, null), message);
+ assertWarns(() -> disallowed.guard(set(4), action, null), message);
assertFails(() -> disallowed.guard(set(4), action, userClientState),
message);
assertValid(() -> disallowed.guard(set(4), action, systemClientState));
assertValid(() -> disallowed.guard(set(4), action, superClientState));
diff --git a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
index a816ca9..a6ce3a1 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
@@ -26,6 +26,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.assertj.core.api.Assertions;
import static java.lang.String.format;
@@ -43,7 +44,25 @@ public abstract class ThresholdTester extends GuardrailTester
private final TriConsumer<Guardrails, Long, Long> setter;
private final ToLongFunction<Guardrails> warnGetter;
private final ToLongFunction<Guardrails> failGetter;
- private final long maxValue = Integer.MAX_VALUE;
+ private final long maxValue;
+ private final long disabledValue;
+
+ protected ThresholdTester(int warnThreshold,
+ int failThreshold,
+ Threshold threshold,
+ TriConsumer<Guardrails, Integer, Integer> setter,
+ ToIntFunction<Guardrails> warnGetter,
+ ToIntFunction<Guardrails> failGetter)
+ {
+ super(threshold);
+ this.warnThreshold = warnThreshold;
+ this.failThreshold = failThreshold;
+ this.setter = (g, w, a) -> setter.accept(g, w.intValue(),
a.intValue());
+ this.warnGetter = g -> (long) warnGetter.applyAsInt(g);
+ this.failGetter = g -> (long) failGetter.applyAsInt(g);
+ maxValue = Integer.MAX_VALUE;
+ disabledValue = Config.DISABLED_GUARDRAIL;
+ }
protected ThresholdTester(long warnThreshold,
long failThreshold,
@@ -58,25 +77,15 @@ public abstract class ThresholdTester extends
GuardrailTester
this.setter = setter;
this.warnGetter = warnGetter;
this.failGetter = failGetter;
+ maxValue = Long.MAX_VALUE;
+ disabledValue = Config.DISABLED_SIZE_GUARDRAIL.toBytes();
}
- protected ThresholdTester(int warnThreshold,
- int failThreshold,
- Threshold threshold,
- TriConsumer<Guardrails, Integer, Integer> setter,
- ToIntFunction<Guardrails> warnGetter,
- ToIntFunction<Guardrails> failGetter)
+ protected long currentValue()
{
- super(threshold);
- this.warnThreshold = warnThreshold;
- this.failThreshold = failThreshold;
- this.setter = (g, w, a) -> setter.accept(g, w.intValue(),
a.intValue());
- this.warnGetter = g -> (long) warnGetter.applyAsInt(g);
- this.failGetter = g -> (long) failGetter.applyAsInt(g);
+ throw new UnsupportedOperationException();
}
- protected abstract long currentValue();
-
protected void assertCurrentValue(int count)
{
assertEquals(count, currentValue());
@@ -97,15 +106,14 @@ public abstract class ThresholdTester extends
GuardrailTester
protected void testValidationOfThresholdProperties(String warnName, String
failName)
{
- setter.accept(guardrails(), -1L, -1L);
+ setter.accept(guardrails(), disabledValue, disabledValue);
- testValidationOfStrictlyPositiveProperty((g, a) -> setter.accept(g,
-1L, a), failName);
- testValidationOfStrictlyPositiveProperty((g, w) -> setter.accept(g, w,
-1L), warnName);
+ testValidationOfStrictlyPositiveProperty((g, a) -> setter.accept(g,
disabledValue, a), failName);
+ testValidationOfStrictlyPositiveProperty((g, w) -> setter.accept(g, w,
disabledValue), warnName);
- setter.accept(guardrails(), -1L, -1L);
+ setter.accept(guardrails(), disabledValue, disabledValue);
Assertions.assertThatThrownBy(() -> setter.accept(guardrails(), 2L,
1L))
- .hasMessageContaining(format("The warn threshold 2 for %s
should be lower than the fail threshold 1",
- guardrail.name +
"_warn_threshold"));
+ .hasMessageContaining(guardrail.name + "_warn_threshold
should be lower than the fail threshold");
}
protected void assertThresholdValid(String query) throws Throwable
@@ -138,7 +146,6 @@ public abstract class ThresholdTester extends
GuardrailTester
private void assertInvalidPositiveProperty(BiConsumer<Guardrails, Long>
setter,
long value,
long maxValue,
- boolean allowZero,
String name)
{
try
@@ -146,6 +153,15 @@ public abstract class ThresholdTester extends
GuardrailTester
assertValidProperty(setter, value);
fail(format("Expected exception for guardrails.%s value: %d",
name, value));
}
+ catch (ConfigurationException e)
+ {
+ String expectedMessage = null;
+
+ if (value < 0)
+ expectedMessage = "Invalid data storage: value must be
positive";
+
+ Assertions.assertThat(e.getMessage()).contains(expectedMessage);
+ }
catch (IllegalArgumentException e)
{
String expectedMessage = null;
@@ -153,14 +169,14 @@ public abstract class ThresholdTester extends
GuardrailTester
if (value > maxValue)
expectedMessage = format("Invalid value %d for %s: maximum
allowed value is %d",
value, name, maxValue);
- if (value == 0 && !allowZero)
+ if (value == 0 && value != disabledValue)
expectedMessage = format("Invalid value for %s: 0 is not
allowed; if attempting to disable use %s",
- name, Config.DISABLED_GUARDRAIL);
+ name, disabledValue);
- if (value < Config.DISABLED_GUARDRAIL)
+ if (value < 0 && value != disabledValue)
expectedMessage = format("Invalid value %d for %s: negative
values are not "
+ "allowed, outside of %s which
disables the guardrail",
- value, name,
Config.DISABLED_GUARDRAIL);
+ value, name, disabledValue);
assertEquals(format("Exception message '%s' does not contain
'%s'", e.getMessage(), expectedMessage),
expectedMessage, e.getMessage());
@@ -169,15 +185,15 @@ public abstract class ThresholdTester extends
GuardrailTester
private void assertInvalidStrictlyPositiveProperty(BiConsumer<Guardrails,
Long> setter, long value, String name)
{
- assertInvalidPositiveProperty(setter, value, maxValue, false, name);
+ assertInvalidPositiveProperty(setter, value, maxValue, name);
}
protected void
testValidationOfStrictlyPositiveProperty(BiConsumer<Guardrails, Long> setter,
String name)
{
assertInvalidStrictlyPositiveProperty(setter, Integer.MIN_VALUE, name);
assertInvalidStrictlyPositiveProperty(setter, -2, name);
- assertValidProperty(setter, (long) Config.DISABLED_GUARDRAIL); //
disabled
- assertInvalidStrictlyPositiveProperty(setter, 0, name);
+ assertValidProperty(setter, disabledValue);
+ assertInvalidStrictlyPositiveProperty(setter, disabledValue == 0 ? -1
: 0, name);
assertValidProperty(setter, 1L);
assertValidProperty(setter, 2L);
assertValidProperty(setter, maxValue);
diff --git a/test/unit/org/apache/cassandra/schema/TableMetadataTest.java
b/test/unit/org/apache/cassandra/schema/TableMetadataTest.java
new file mode 100644
index 0000000..357ac01
--- /dev/null
+++ b/test/unit/org/apache/cassandra/schema/TableMetadataTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+
+import static org.junit.Assert.assertEquals;
+
+public class TableMetadataTest
+{
+ @Test
+ public void testPartitionKeyAsCQLLiteral()
+ {
+ String keyspaceName = "keyspace";
+ String tableName = "table";
+
+ // composite type
+ CompositeType type1 = CompositeType.getInstance(UTF8Type.instance,
UTF8Type.instance, UTF8Type.instance);
+ TableMetadata metadata1 = TableMetadata.builder(keyspaceName,
tableName)
+ .addPartitionKeyColumn("key",
type1)
+ .build();
+ assertEquals("('test:', 'composite!', 'type)')",
+
metadata1.partitionKeyAsCQLLiteral(type1.decompose("test:", "composite!",
"type)")));
+
+ // composite type with tuple
+ CompositeType type2 = CompositeType.getInstance(new
TupleType(Arrays.asList(FloatType.instance, UTF8Type.instance)),
+ IntegerType.instance);
+ TableMetadata metadata2 = TableMetadata.builder(keyspaceName,
tableName)
+ .addPartitionKeyColumn("key",
type2)
+ .build();
+ ByteBuffer tupleValue = TupleType.buildValue(new ByteBuffer[]{
FloatType.instance.decompose(0.33f),
+
UTF8Type.instance.decompose("tuple test") });
+ assertEquals("((0.33, 'tuple test'), 10)",
+
metadata2.partitionKeyAsCQLLiteral(type2.decompose(tupleValue,
BigInteger.valueOf(10))));
+
+ // plain type
+ TableMetadata metadata3 = TableMetadata.builder(keyspaceName,
tableName)
+ .addPartitionKeyColumn("key",
UTF8Type.instance).build();
+ assertEquals("'non-composite test'",
+
metadata3.partitionKeyAsCQLLiteral(UTF8Type.instance.decompose("non-composite
test")));
+ }
+
+ @Test
+ public void testPrimaryKeyAsCQLLiteral()
+ {
+ String keyspaceName = "keyspace";
+ String tableName = "table";
+
+ TableMetadata metadata;
+
+ // one partition key column, no clustering key
+ metadata = TableMetadata.builder(keyspaceName, tableName)
+ .addPartitionKeyColumn("key",
UTF8Type.instance)
+ .build();
+ assertEquals("'Test'",
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("Test"),
Clustering.EMPTY));
+
+ // two partition key columns, no clustering key
+ metadata = TableMetadata.builder(keyspaceName, tableName)
+ .addPartitionKeyColumn("k1", UTF8Type.instance)
+ .addPartitionKeyColumn("k2",
Int32Type.instance)
+ .build();
+ assertEquals("('Test', -12)",
+
metadata.primaryKeyAsCQLLiteral(CompositeType.getInstance(UTF8Type.instance,
Int32Type.instance)
+
.decompose("Test", -12), Clustering.EMPTY));
+
+ // one partition key column, one clustering key column
+ metadata = TableMetadata.builder(keyspaceName, tableName)
+ .addPartitionKeyColumn("key",
UTF8Type.instance)
+ .addClusteringColumn("clustering",
UTF8Type.instance)
+ .build();
+ assertEquals("('k', 'Cluster')",
+
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"),
+
Clustering.make(UTF8Type.instance.decompose("Cluster"))));
+ assertEquals("'k'",
+
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"),
Clustering.EMPTY));
+ assertEquals("'k'",
+
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"),
Clustering.STATIC_CLUSTERING));
+
+ // one partition key column, two clustering key columns
+ metadata = TableMetadata.builder(keyspaceName, tableName)
+ .addPartitionKeyColumn("key",
UTF8Type.instance)
+ .addClusteringColumn("c1", UTF8Type.instance)
+ .addClusteringColumn("c2", UTF8Type.instance)
+ .build();
+ assertEquals("('k', 'c1', 'c2')",
+
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"),
+
Clustering.make(UTF8Type.instance.decompose("c1"),
+
UTF8Type.instance.decompose("c2"))));
+ assertEquals("'k'",
+
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"),
Clustering.EMPTY));
+ assertEquals("'k'",
+
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"),
Clustering.STATIC_CLUSTERING));
+
+ // two partition key columns, two clustering key columns
+ CompositeType composite =
CompositeType.getInstance(Int32Type.instance, BooleanType.instance);
+ metadata = TableMetadata.builder(keyspaceName, tableName)
+ .addPartitionKeyColumn("k1",
Int32Type.instance)
+ .addPartitionKeyColumn("k2",
BooleanType.instance)
+ .addClusteringColumn("c1", UTF8Type.instance)
+ .addClusteringColumn("c2", UTF8Type.instance)
+ .build();
+ assertEquals("(0, true, 'Cluster_1', 'Cluster_2')",
+ metadata.primaryKeyAsCQLLiteral(composite.decompose(0,
true),
+
Clustering.make(UTF8Type.instance.decompose("Cluster_1"),
+
UTF8Type.instance.decompose("Cluster_2"))));
+ assertEquals("(1, true)",
+ metadata.primaryKeyAsCQLLiteral(composite.decompose(1,
true), Clustering.EMPTY));
+ assertEquals("(2, true)",
+ metadata.primaryKeyAsCQLLiteral(composite.decompose(2,
true), Clustering.STATIC_CLUSTERING));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]