This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 935bcf1  Add guardrails for collection items and size
935bcf1 is described below

commit 935bcf1e8732a4138c15205896945c2f02ddb844
Author: Andrés de la Peña <[email protected]>
AuthorDate: Tue Mar 15 13:31:41 2022 +0000

    Add guardrails for collection items and size
    
    patch by Andrés de la Peña; reviewed by Berenguer Blasi for CASSANDRA-17153
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  16 +
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../apache/cassandra/config/GuardrailsOptions.java |  84 +++-
 src/java/org/apache/cassandra/cql3/Lists.java      |  35 +-
 src/java/org/apache/cassandra/cql3/Maps.java       |  35 +-
 src/java/org/apache/cassandra/cql3/Sets.java       |  34 +-
 .../apache/cassandra/cql3/UpdateParameters.java    |   7 +-
 src/java/org/apache/cassandra/db/ColumnIndex.java  |   7 +-
 .../cassandra/db/guardrails/DisableFlag.java       |   7 +-
 .../apache/cassandra/db/guardrails/Guardrail.java  |   5 +-
 .../apache/cassandra/db/guardrails/Guardrails.java |  72 +++-
 .../cassandra/db/guardrails/GuardrailsConfig.java  |  21 +
 .../cassandra/db/guardrails/GuardrailsMBean.java   |  34 +-
 .../apache/cassandra/db/guardrails/Threshold.java  |  35 +-
 .../org/apache/cassandra/db/guardrails/Values.java |   7 +-
 .../cassandra/io/sstable/format/SSTableWriter.java |  48 +++
 .../org/apache/cassandra/schema/TableMetadata.java |  61 +++
 .../GuardrailCollectionSizeOnSSTableWriteTest.java | 438 +++++++++++++++++++++
 ...rdrailItemsPerCollectionOnSSTableWriteTest.java | 340 ++++++++++++++++
 .../test/guardrails/GuardrailTester.java           | 154 ++++++++
 .../db/guardrails/GuardrailCollectionSizeTest.java | 248 ++++++++++++
 .../GuardrailItemsPerCollectionTest.java           | 272 +++++++++++++
 .../db/guardrails/GuardrailPageSizeTest.java       |   6 -
 .../cassandra/db/guardrails/GuardrailsTest.java    |  16 +-
 .../cassandra/db/guardrails/ThresholdTester.java   |  74 ++--
 .../apache/cassandra/schema/TableMetadataTest.java | 140 +++++++
 27 files changed, 2098 insertions(+), 104 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ea1826a..21c163e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add guardrails for collection items and size (CASSANDRA-17153)
  * Improve guardrails messages (CASSANDRA-17430)
  * Remove all usages of junit.framework and ban them via Checkstyle 
(CASSANDRA-17316)
  * Add guardrails for read/write consistency levels (CASSANDRA-17188)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8df5771..ce79767 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1632,6 +1632,22 @@ drop_compact_storage_enabled: false
 # Guardrail to warn about or reject write consistency levels. By default, all 
consistency levels are allowed.
 # write_consistency_levels_warned: []
 # write_consistency_levels_disallowed: []
+# Guardrail to warn or fail when encountering larger size of collection data 
than threshold.
+# At query time this guardrail is applied only to the collection fragment that 
is being writen, even though in the case
+# of non-frozen collections there could be unaccounted parts of the collection 
on the sstables. This is done this way to
+# prevent read-before-write. The guardrail is also checked at sstable write 
time to detect large non-frozen collections,
+# although in that case exceeding the fail threshold will only log an error 
message, without interrupting the operation.
+# The two thresholds default to 0KiB to disable.
+# collection_size_warn_threshold: 0KiB
+# collection_size_fail_threshold: 0KiB
+# Guardrail to warn or fail when encountering more elements in collection than 
threshold.
+# At query time this guardrail is applied only to the collection fragment that 
is being writen, even though in the case
+# of non-frozen collections there could be unaccounted parts of the collection 
on the sstables. This is done this way to
+# prevent read-before-write. The guardrail is also checked at sstable write 
time to detect large non-frozen collections,
+# although in that case exceeding the fail threshold will only log an error 
message, without interrupting the operation.
+# The two thresholds default to -1 to disable.
+# items_per_collection_warn_threshold: -1
+# items_per_collection_fail_threshold: -1
 
 # Startup Checks are executed as part of Cassandra startup process, not all of 
them
 # are configurable (so you can disable them) but these which are enumerated 
bellow.
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 671e49d..554b0bb 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -771,6 +771,7 @@ public class Config
     public volatile SubnetGroups internode_error_reporting_exclusions = new 
SubnetGroups();
 
     public static final int DISABLED_GUARDRAIL = -1;
+    public static final DataStorageSpec DISABLED_SIZE_GUARDRAIL = 
DataStorageSpec.inBytes(0);
     public volatile boolean guardrails_enabled = false;
     public volatile int keyspaces_warn_threshold = DISABLED_GUARDRAIL;
     public volatile int keyspaces_fail_threshold = DISABLED_GUARDRAIL;
@@ -797,6 +798,10 @@ public class Config
     public volatile Set<ConsistencyLevel> write_consistency_levels_disallowed 
= Collections.emptySet();
     public volatile boolean user_timestamps_enabled = true;
     public volatile boolean read_before_write_list_operations_enabled = true;
+    public volatile DataStorageSpec collection_size_warn_threshold = 
DISABLED_SIZE_GUARDRAIL;
+    public volatile DataStorageSpec collection_size_fail_threshold = 
DISABLED_SIZE_GUARDRAIL;
+    public volatile int items_per_collection_warn_threshold = 
DISABLED_GUARDRAIL;
+    public volatile int items_per_collection_fail_threshold = 
DISABLED_GUARDRAIL;
 
     public volatile DurationSpec streaming_state_expires = 
DurationSpec.inDays(3);
     public volatile DataStorageSpec streaming_state_size = 
DataStorageSpec.inMebibytes(40);
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java 
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index a8b7ada..692c255 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -55,7 +55,7 @@ import static java.util.stream.Collectors.toSet;
 public class GuardrailsOptions implements GuardrailsConfig
 {
     private static final Logger logger = 
LoggerFactory.getLogger(GuardrailsOptions.class);
-    
+
     private final Config config;
 
     public GuardrailsOptions(Config config)
@@ -77,6 +77,8 @@ public class GuardrailsOptions implements GuardrailsConfig
         config.read_consistency_levels_disallowed = 
validateConsistencyLevels(config.read_consistency_levels_disallowed, 
"read_consistency_levels_disallowed");
         config.write_consistency_levels_warned = 
validateConsistencyLevels(config.write_consistency_levels_warned, 
"write_consistency_levels_warned");
         config.write_consistency_levels_disallowed = 
validateConsistencyLevels(config.write_consistency_levels_disallowed, 
"write_consistency_levels_disallowed");
+        validateSizeThreshold(config.collection_size_warn_threshold, 
config.collection_size_fail_threshold, "collection_size");
+        validateIntThreshold(config.items_per_collection_warn_threshold, 
config.items_per_collection_fail_threshold, "items_per_collection");
     }
 
     @Override
@@ -423,6 +425,55 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> 
config.write_consistency_levels_disallowed = x);
     }
 
+    public DataStorageSpec getCollectionSizeWarnThreshold()
+    {
+        return config.collection_size_warn_threshold;
+    }
+
+    @Override
+    public DataStorageSpec getCollectionSizeFailThreshold()
+    {
+        return config.collection_size_fail_threshold;
+    }
+
+    public void setCollectionSizeThreshold(DataStorageSpec warn, 
DataStorageSpec fail)
+    {
+        validateSizeThreshold(warn, fail, "collection_size");
+        updatePropertyWithLogging("collection_size_warn_threshold",
+                                  warn,
+                                  () -> config.collection_size_warn_threshold,
+                                  x -> config.collection_size_warn_threshold = 
x);
+        updatePropertyWithLogging("collection_size_fail_threshold",
+                                  fail,
+                                  () -> config.collection_size_fail_threshold,
+                                  x -> config.collection_size_fail_threshold = 
x);
+    }
+
+    @Override
+    public int getItemsPerCollectionWarnThreshold()
+    {
+        return config.items_per_collection_warn_threshold;
+    }
+
+    @Override
+    public int getItemsPerCollectionFailThreshold()
+    {
+        return config.items_per_collection_fail_threshold;
+    }
+
+    public void setItemsPerCollectionThreshold(int warn, int fail)
+    {
+        validateIntThreshold(warn, fail, "items_per_collection");
+        updatePropertyWithLogging("items_per_collection_warn_threshold",
+                                  warn,
+                                  () -> 
config.items_per_collection_warn_threshold,
+                                  x -> 
config.items_per_collection_warn_threshold = x);
+        updatePropertyWithLogging("items_per_collection_fail_threshold",
+                                  fail,
+                                  () -> 
config.items_per_collection_fail_threshold,
+                                  x -> 
config.items_per_collection_fail_threshold = x);
+    }
+
     private static <T> void updatePropertyWithLogging(String propertyName, T 
newValue, Supplier<T> getter, Consumer<T> setter)
     {
         T oldValue = getter.get();
@@ -433,36 +484,31 @@ public class GuardrailsOptions implements GuardrailsConfig
         }
     }
 
-    private static void validatePositiveNumeric(long value, long maxValue, 
boolean allowZero, String name)
+    private static void validatePositiveNumeric(long value, long maxValue, 
String name)
     {
+        if (value == Config.DISABLED_GUARDRAIL)
+            return;
+
         if (value > maxValue)
             throw new IllegalArgumentException(format("Invalid value %d for 
%s: maximum allowed value is %d",
                                                       value, name, maxValue));
 
-        if (value == 0 && !allowZero)
+        if (value == 0)
             throw new IllegalArgumentException(format("Invalid value for %s: 0 
is not allowed; " +
                                                       "if attempting to 
disable use %d",
                                                       name, 
Config.DISABLED_GUARDRAIL));
 
         // We allow -1 as a general "disabling" flag. But reject anything 
lower to avoid mistakes.
-        if (value < Config.DISABLED_GUARDRAIL)
+        if (value <= 0)
             throw new IllegalArgumentException(format("Invalid value %d for 
%s: negative values are not allowed, " +
                                                       "outside of %d which 
disables the guardrail",
                                                       value, name, 
Config.DISABLED_GUARDRAIL));
     }
 
-    private static void validateStrictlyPositiveInteger(long value, String 
name)
-    {
-        // We use 'long' for generality, but most numeric guardrail cannot 
effectively be more than a 'int' for various
-        // internal reasons. Not that any should ever come close in practice 
...
-        // Also, in most cases, zero does not make sense (allowing 0 tables or 
columns is not exactly useful).
-        validatePositiveNumeric(value, Integer.MAX_VALUE, false, name);
-    }
-
     private static void validateIntThreshold(int warn, int fail, String name)
     {
-        validateStrictlyPositiveInteger(warn, name + "_warn_threshold");
-        validateStrictlyPositiveInteger(fail, name + "_fail_threshold");
+        validatePositiveNumeric(warn, Integer.MAX_VALUE, name + 
"_warn_threshold");
+        validatePositiveNumeric(fail, Integer.MAX_VALUE, name + 
"_fail_threshold");
         validateWarnLowerThanFail(warn, fail, name);
     }
 
@@ -476,6 +522,16 @@ public class GuardrailsOptions implements GuardrailsConfig
                                                       "than the fail threshold 
%d", warn, name, fail));
     }
 
+    private static void validateSizeThreshold(DataStorageSpec warn, 
DataStorageSpec fail, String name)
+    {
+        if (warn.equals(Config.DISABLED_SIZE_GUARDRAIL) || 
fail.equals(Config.DISABLED_SIZE_GUARDRAIL))
+            return;
+
+        if (fail.toBytes() < warn.toBytes())
+            throw new IllegalArgumentException(format("The warn threshold %s 
for %s_warn_threshold should be lower " +
+                                                      "than the fail threshold 
%s", warn, name, fail));
+    }
+
     private static Set<String> validateTableProperties(Set<String> properties, 
String name)
     {
         if (properties == null)
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java 
b/src/java/org/apache/cassandra/cql3/Lists.java
index 08de949..dba9ba6 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -492,26 +492,43 @@ public abstract class Lists
 
         static void doAppend(Term.Terminal value, ColumnMetadata column, 
UpdateParameters params) throws InvalidRequestException
         {
-            if (column.type.isMultiCell())
+            if (value == null)
             {
+                // for frozen lists, we're overwriting the whole cell value
+                if (!column.type.isMultiCell())
+                    params.addTombstone(column);
+
                 // If we append null, do nothing. Note that for Setter, we've
                 // already removed the previous value so we're good here too
-                if (value == null)
+                return;
+            }
+
+            List<ByteBuffer> elements = ((Value) value).elements;
+
+            if (column.type.isMultiCell())
+            {
+                if (elements.size() == 0)
                     return;
 
-                for (ByteBuffer buffer : ((Value) value).elements)
+                // Guardrails about collection size are only checked for the 
added elements without considering
+                // already existent elements. This is done so to avoid 
read-before-write, having additional checks
+                // during SSTable write.
+                Guardrails.itemsPerCollection.guard(elements.size(), 
column.name.toString(), params.clientState);
+
+                int dataSize = 0;
+                for (ByteBuffer buffer : elements)
                 {
                     ByteBuffer uuid = 
ByteBuffer.wrap(params.nextTimeUUIDAsBytes());
-                    params.addCell(column, CellPath.create(uuid), buffer);
+                    Cell<?> cell = params.addCell(column, 
CellPath.create(uuid), buffer);
+                    dataSize += cell.dataSize();
                 }
+                Guardrails.collectionSize.guard(dataSize, 
column.name.toString(), params.clientState);
             }
             else
             {
-                // for frozen lists, we're overwriting the whole cell value
-                if (value == null)
-                    params.addTombstone(column);
-                else
-                    params.addCell(column, value.get(ProtocolVersion.CURRENT));
+                Guardrails.itemsPerCollection.guard(elements.size(), 
column.name.toString(), params.clientState);
+                Cell<?> cell = params.addCell(column, 
value.get(ProtocolVersion.CURRENT));
+                Guardrails.collectionSize.guard(cell.dataSize(), 
column.name.toString(), params.clientState);
             }
         }
     }
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java 
b/src/java/org/apache/cassandra/cql3/Maps.java
index 6e7e07b..b5ee2ec 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.DecoratedKey;
@@ -421,22 +422,40 @@ public abstract class Maps
 
         static void doPut(Term.Terminal value, ColumnMetadata column, 
UpdateParameters params) throws InvalidRequestException
         {
+            if (value == null)
+            {
+                // for frozen maps, we're overwriting the whole cell
+                if (!column.type.isMultiCell())
+                    params.addTombstone(column);
+
+                return;
+            }
+
+            Map<ByteBuffer, ByteBuffer> elements = ((Value) value).map;
+
             if (column.type.isMultiCell())
             {
-                if (value == null)
+                if (elements.size() == 0)
                     return;
 
-                Map<ByteBuffer, ByteBuffer> elements = ((Value) value).map;
+                // Guardrails about collection size are only checked for the 
added elements without considering
+                // already existent elements. This is done so to avoid 
read-before-write, having additional checks
+                // during SSTable write.
+                Guardrails.itemsPerCollection.guard(elements.size(), 
column.name.toString(), params.clientState);
+
+                int dataSize = 0;
                 for (Map.Entry<ByteBuffer, ByteBuffer> entry : 
elements.entrySet())
-                    params.addCell(column, CellPath.create(entry.getKey()), 
entry.getValue());
+                {
+                    Cell<?> cell = params.addCell(column, 
CellPath.create(entry.getKey()), entry.getValue());
+                    dataSize += cell.dataSize();
+                }
+                Guardrails.collectionSize.guard(dataSize, 
column.name.toString(), params.clientState);
             }
             else
             {
-                // for frozen maps, we're overwriting the whole cell
-                if (value == null)
-                    params.addTombstone(column);
-                else
-                    params.addCell(column, value.get(ProtocolVersion.CURRENT));
+                Guardrails.itemsPerCollection.guard(elements.size(), 
column.name.toString(), params.clientState);
+                Cell<?> cell = params.addCell(column, 
value.get(ProtocolVersion.CURRENT));
+                Guardrails.collectionSize.guard(cell.dataSize(), 
column.name.toString(), params.clientState);
             }
         }
     }
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java 
b/src/java/org/apache/cassandra/cql3/Sets.java
index 6b53d55..146f7b7 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -24,6 +24,7 @@ import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.DecoratedKey;
@@ -348,26 +349,43 @@ public abstract class Sets
 
         static void doAdd(Term.Terminal value, ColumnMetadata column, 
UpdateParameters params) throws InvalidRequestException
         {
+            if (value == null)
+            {
+                // for frozen sets, we're overwriting the whole cell
+                if (!column.type.isMultiCell())
+                    params.addTombstone(column);
+
+                return;
+            }
+
+            SortedSet<ByteBuffer> elements = ((Value) value).elements;
+
             if (column.type.isMultiCell())
             {
-                if (value == null)
+                if (elements.size() == 0)
                     return;
 
-                for (ByteBuffer bb : ((Value) value).elements)
+                // Guardrails about collection size are only checked for the 
added elements without considering
+                // already existent elements. This is done so to avoid 
read-before-write, having additional checks
+                // during SSTable write.
+                Guardrails.itemsPerCollection.guard(elements.size(), 
column.name.toString(), params.clientState);
+
+                int dataSize = 0;
+                for (ByteBuffer bb : elements)
                 {
                     if (bb == ByteBufferUtil.UNSET_BYTE_BUFFER)
                         continue;
 
-                    params.addCell(column, CellPath.create(bb), 
ByteBufferUtil.EMPTY_BYTE_BUFFER);
+                    Cell<?> cell = params.addCell(column, CellPath.create(bb), 
ByteBufferUtil.EMPTY_BYTE_BUFFER);
+                    dataSize += cell.dataSize();
                 }
+                Guardrails.collectionSize.guard(dataSize, 
column.name.toString(), params.clientState);
             }
             else
             {
-                // for frozen sets, we're overwriting the whole cell
-                if (value == null)
-                    params.addTombstone(column);
-                else
-                    params.addCell(column, value.get(ProtocolVersion.CURRENT));
+                Guardrails.itemsPerCollection.guard(elements.size(), 
column.name.toString(), params.clientState);
+                Cell<?> cell = params.addCell(column, 
value.get(ProtocolVersion.CURRENT));
+                Guardrails.collectionSize.guard(cell.dataSize(), 
column.name.toString(), params.clientState);
             }
         }
     }
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java 
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 8f3eb37..2d59366 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -146,17 +146,18 @@ public class UpdateParameters
         builder.addCell(BufferCell.tombstone(column, timestamp, nowInSec, 
path));
     }
 
-    public void addCell(ColumnMetadata column, ByteBuffer value) throws 
InvalidRequestException
+    public Cell<?> addCell(ColumnMetadata column, ByteBuffer value) throws 
InvalidRequestException
     {
-        addCell(column, null, value);
+        return addCell(column, null, value);
     }
 
-    public void addCell(ColumnMetadata column, CellPath path, ByteBuffer 
value) throws InvalidRequestException
+    public Cell<?> addCell(ColumnMetadata column, CellPath path, ByteBuffer 
value) throws InvalidRequestException
     {
         Cell<?> cell = ttl == LivenessInfo.NO_TTL
                        ? BufferCell.live(column, timestamp, value, path)
                        : BufferCell.expiring(column, timestamp, ttl, nowInSec, 
value, path);
         builder.addCell(cell);
+        return cell;
     }
 
     public void addCounter(ColumnMetadata column, long increment) throws 
InvalidRequestException
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java 
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 15405bd..f7860df 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.SequentialWriter;
@@ -115,7 +116,11 @@ public class ColumnIndex
         this.headerLength = writer.position() - initialPosition;
 
         while (iterator.hasNext())
-            add(iterator.next());
+        {
+            Unfiltered unfiltered = iterator.next();
+            SSTableWriter.guardCollectionSize(iterator.metadata(), 
iterator.partitionKey(), unfiltered);
+            add(unfiltered);
+        }
 
         finish();
     }
diff --git a/src/java/org/apache/cassandra/db/guardrails/DisableFlag.java 
b/src/java/org/apache/cassandra/db/guardrails/DisableFlag.java
index b6a8ed9..9ec1951 100644
--- a/src/java/org/apache/cassandra/db/guardrails/DisableFlag.java
+++ b/src/java/org/apache/cassandra/db/guardrails/DisableFlag.java
@@ -73,11 +73,14 @@ public class DisableFlag extends Guardrail
      *
      * @param what  The feature that is guarded by this guardrail (for 
reporting in error messages).
      * @param state The client state, used to skip the check if the query is 
internal or is done by a superuser.
-     *              A {@code null} value means that the check should be done 
regardless of the query.
+     *              A {@code null} value means that the check should be done 
regardless of the query, although it won't
+     *              throw any exception if the failure threshold is exceeded. 
This is so because checks without an
+     *              associated client come from asynchronous processes such as 
compaction, and we don't want to
+     *              interrupt such processes.
      */
     public void ensureEnabled(String what, @Nullable ClientState state)
     {
         if (enabled(state) && disabled.test(state))
-            fail(what + " is not allowed");
+            fail(what + " is not allowed", state);
     }
 }
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
index 33dd8af..334d956 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
@@ -79,7 +79,7 @@ public abstract class Guardrail
         Tracing.trace(message);
     }
 
-    protected void fail(String message)
+    protected void fail(String message, @Nullable ClientState state)
     {
         message = decorateMessage(message);
 
@@ -90,7 +90,8 @@ public abstract class Guardrail
         // Similarly, tracing will also ignore the message if we're not 
running tracing on the current thread.
         Tracing.trace(message);
 
-        throw new GuardrailViolatedException(message);
+        if (state != null)
+            throw new GuardrailViolatedException(message);
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 5adab5a..99aae55 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.GuardrailsOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -178,6 +179,7 @@ public final class Guardrails implements GuardrailsMBean
                             : format("Aborting query because the cartesian 
product of the IN restrictions on %s " +
                                      "produces %d values, this exceeds fail 
threshold of %s.",
                                      what, value, threshold));
+
     /**
      * Guardrail on read consistency levels.
      */
@@ -198,6 +200,32 @@ public final class Guardrails implements GuardrailsMBean
                  state -> 
CONFIG_PROVIDER.getOrCreate(state).getWriteConsistencyLevelsDisallowed(),
                  "write consistency levels");
 
+    /**
+     * Guardrail on the size of a collection.
+     */
+    public static final Threshold collectionSize =
+    new Threshold("collection_size",
+                  state -> 
CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold().toBytes(),
+                  state -> 
CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold().toBytes(),
+                  (isWarning, what, value, threshold) ->
+                  isWarning ? format("Detected collection %s of size %s, this 
exceeds the warning threshold of %s.",
+                                     what, value, threshold)
+                            : format("Detected collection %s of size %s, this 
exceeds the failure threshold of %s.",
+                                     what, value, threshold));
+
+    /**
+     * Guardrail on the number of items of a collection.
+     */
+    public static final Threshold itemsPerCollection =
+    new Threshold("items_per_collection",
+                  state -> 
CONFIG_PROVIDER.getOrCreate(state).getItemsPerCollectionWarnThreshold(),
+                  state -> 
CONFIG_PROVIDER.getOrCreate(state).getItemsPerCollectionFailThreshold(),
+                  (isWarning, what, value, threshold) ->
+                  isWarning ? format("Detected collection %s with %s items, 
this exceeds the warning threshold of %s.",
+                                     what, value, threshold)
+                            : format("Detected collection %s with %s items, 
this exceeds the failure threshold of %s.",
+                                     what, value, threshold));
+
     private Guardrails()
     {
         MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
@@ -446,21 +474,57 @@ public final class Guardrails implements GuardrailsMBean
     }
 
     @Override
+    public int getPartitionKeysInSelectWarnThreshold()
+    {
+        return DEFAULT_CONFIG.getPartitionKeysInSelectWarnThreshold();
+    }
+
+    @Override
+    public int getPartitionKeysInSelectFailThreshold()
+    {
+        return DEFAULT_CONFIG.getPartitionKeysInSelectFailThreshold();
+    }
+
+    @Override
     public void setPartitionKeysInSelectThreshold(int warn, int fail)
     {
         DEFAULT_CONFIG.setPartitionKeysInSelectThreshold(warn, fail);
     }
 
+    public long getCollectionSizeWarnThresholdInKiB()
+    {
+        return DEFAULT_CONFIG.getCollectionSizeWarnThreshold().toKibibytes();
+    }
+
     @Override
-    public int getPartitionKeysInSelectWarnThreshold()
+    public long getCollectionSizeFailThresholdInKiB()
     {
-        return DEFAULT_CONFIG.getPartitionKeysInSelectWarnThreshold();
+        return DEFAULT_CONFIG.getCollectionSizeFailThreshold().toKibibytes();
     }
 
     @Override
-    public int getPartitionKeysInSelectFailThreshold()
+    public void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB)
     {
-        return DEFAULT_CONFIG.getPartitionKeysInSelectFailThreshold();
+        
DEFAULT_CONFIG.setCollectionSizeThreshold(DataStorageSpec.inKibibytes(warnInKiB),
+                                                  
DataStorageSpec.inKibibytes(failInKiB));
+    }
+
+    @Override
+    public int getItemsPerCollectionWarnThreshold()
+    {
+        return DEFAULT_CONFIG.getItemsPerCollectionWarnThreshold();
+    }
+
+    @Override
+    public int getItemsPerCollectionFailThreshold()
+    {
+        return DEFAULT_CONFIG.getItemsPerCollectionFailThreshold();
+    }
+
+    @Override
+    public void setItemsPerCollectionThreshold(int warn, int fail)
+    {
+        DEFAULT_CONFIG.setItemsPerCollectionThreshold(warn, fail);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index fc671e7..396d6cb 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.guardrails;
 
 import java.util.Set;
 
+import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.db.ConsistencyLevel;
 
 /**
@@ -183,4 +184,24 @@ public interface GuardrailsConfig
      * @return The consistency levels that are disallowed when writing.
      */
     Set<ConsistencyLevel> getWriteConsistencyLevelsDisallowed();
+
+    /*
+     * @return The threshold to warn when encountering a collection with 
larger data size than threshold.
+     */
+    DataStorageSpec getCollectionSizeWarnThreshold();
+
+    /**
+     * @return The threshold to prevent collections with larger data size than 
threshold.
+     */
+    DataStorageSpec getCollectionSizeFailThreshold();
+
+    /**
+     * @return The threshold to warn when encountering more elements in a 
collection than threshold.
+     */
+    int getItemsPerCollectionWarnThreshold();
+
+    /**
+     * @return The threshold to prevent collections with more elements than 
threshold.
+     */
+    int getItemsPerCollectionFailThreshold();
 }
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index 6b59b05..7a4eb60 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -259,7 +259,7 @@ public interface GuardrailsMBean
      * @param warn The threshold to warn when the number of partition keys in 
a select statement is greater than
      *             threshold -1 means disabled.
      * @param fail The threshold to prevent when the number of partition keys 
in a select statement is more than
-     *              threshold -1 means disabled.
+     *             threshold -1 means disabled.
      */
     void setPartitionKeysInSelectThreshold(int warn, int fail);
 
@@ -362,4 +362,36 @@ public interface GuardrailsMBean
      * @param consistencyLevels Comma-separated list of consistency levels 
that are not allowed when writing.
      */
     void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels);
+
+    /**
+     * @return The threshold to warn when encountering larger size of 
collection data than threshold, in KiB.
+     */
+    long getCollectionSizeWarnThresholdInKiB();
+
+    /**
+     * @return The threshold to prevent collections with larger data size than 
threshold, in KiB.
+     */
+    long getCollectionSizeFailThresholdInKiB();
+
+    /**
+     * @param warnInKiB The threshold to warn when encountering larger size of 
collection data than threshold, in KiB.
+     * @param failInKiB The threshold to prevent collections with larger data 
size than threshold, in KiB.
+     */
+    void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB);
+
+    /**
+     * @return The threshold to warn when encountering more elements in a 
collection than threshold.
+     */
+    int getItemsPerCollectionWarnThreshold();
+
+    /**
+     * @return The threshold to prevent collections with more elements than 
threshold.
+     */
+    int getItemsPerCollectionFailThreshold();
+
+    /**
+     * @param warn The threshold to warn when encountering more elements in a 
collection than threshold.
+     * @param fail The threshold to prevent collectiosn with more elements 
than threshold.
+     */
+    void setItemsPerCollectionThreshold(int warn, int fail);
 }
diff --git a/src/java/org/apache/cassandra/db/guardrails/Threshold.java 
b/src/java/org/apache/cassandra/db/guardrails/Threshold.java
index 85fddce..3e63cb3 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Threshold.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Threshold.java
@@ -67,13 +67,13 @@ public class Threshold extends Guardrail
     private long failValue(ClientState state)
     {
         long failValue = failThreshold.applyAsLong(state);
-        return failValue < 0 ? Long.MAX_VALUE : failValue;
+        return failValue <= 0 ? Long.MAX_VALUE : failValue;
     }
 
     private long warnValue(ClientState state)
     {
         long warnValue = warnThreshold.applyAsLong(state);
-        return warnValue < 0 ? Long.MAX_VALUE : warnValue;
+        return warnValue <= 0 ? Long.MAX_VALUE : warnValue;
     }
 
     @Override
@@ -82,7 +82,25 @@ public class Threshold extends Guardrail
         if (!super.enabled(state))
             return false;
 
-        return failThreshold.applyAsLong(state) >= 0 || 
warnThreshold.applyAsLong(state) >= 0;
+        return failThreshold.applyAsLong(state) > 0 || 
warnThreshold.applyAsLong(state) > 0;
+    }
+
+    /**
+     * Checks whether the provided value would trigger a warning or failure if 
passed to {@link #guard}.
+     *
+     * <p>This method is optional (does not have to be called) but can be used 
in the case where the "what"
+     * argument to {@link #guard} is expensive to build to save doing so in 
the common case (of the guardrail
+     * not being triggered).
+     *
+     * @param value the value to test.
+     * @param state The client state, used to skip the check if the query is 
internal or is done by a superuser.
+     *              A {@code null} value means that the check should be done 
regardless of the query.
+     * @return {@code true} if {@code value} is above the warning or failure 
thresholds of this guardrail,
+     * {@code false otherwise}.
+     */
+    public boolean triggersOn(long value, @Nullable ClientState state)
+    {
+        return enabled(state) && (value > Math.min(failValue(state), 
warnValue(state)));
     }
 
     /**
@@ -93,7 +111,10 @@ public class Threshold extends Guardrail
      *              guardrail is triggered. For instance, say the guardrail 
guards the size of column values, then this
      *              argument must describe which column of which row is 
triggering the guardrail for convenience.
      * @param state The client state, used to skip the check if the query is 
internal or is done by a superuser.
-     *              A {@code null} value means that the check should be done 
regardless of the query.
+     *              A {@code null} value means that the check should be done 
regardless of the query, although it won't
+     *              throw any exception if the failure threshold is exceeded. 
This is so because checks without an
+     *              associated client come from asynchronous processes such as 
compaction, and we don't want to
+     *              interrupt such processes.
      */
     public void guard(long value, String what, @Nullable ClientState state)
     {
@@ -103,7 +124,7 @@ public class Threshold extends Guardrail
         long failValue = failValue(state);
         if (value > failValue)
         {
-            triggerFail(value, failValue, what);
+            triggerFail(value, failValue, what, state);
             return;
         }
 
@@ -112,9 +133,9 @@ public class Threshold extends Guardrail
             triggerWarn(value, warnValue, what);
     }
 
-    private void triggerFail(long value, long failValue, String what)
+    private void triggerFail(long value, long failValue, String what, 
ClientState state)
     {
-        fail(errMsg(false, what, value, failValue));
+        fail(errMsg(false, what, value, failValue), state);
     }
 
     private void triggerWarn(long value, long warnValue, String what)
diff --git a/src/java/org/apache/cassandra/db/guardrails/Values.java 
b/src/java/org/apache/cassandra/db/guardrails/Values.java
index edb5d2a..f46e3af 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Values.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Values.java
@@ -93,7 +93,10 @@ public class Values<T> extends Guardrail
      * @param ignoreAction An action called on the subset of {@code values} 
that should be ignored. This action
      *                     should do whatever is necessary to make sure the 
value is ignored.
      * @param state        The client state, used to skip the check if the 
query is internal or is done by a superuser.
-     *                     A {@code null} value means that the check should be 
done regardless of the query.
+     *                     A {@code null} value means that the check should be 
done regardless of the query, although it
+     *                     won't throw any exception if the failure threshold 
is exceeded. This is so because checks
+     *                     without an associated client come from asynchronous 
processes such as compaction, and we
+     *                     don't want to interrupt such processes.
      */
     public void guard(Set<T> values, Consumer<T> ignoreAction, @Nullable 
ClientState state)
     {
@@ -104,7 +107,7 @@ public class Values<T> extends Guardrail
         Set<T> toDisallow = Sets.intersection(values, disallowed);
         if (!toDisallow.isEmpty())
             fail(format("Provided values %s are not allowed for %s (disallowed 
values are: %s)",
-                        
toDisallow.stream().sorted().collect(Collectors.toList()), what, disallowed));
+                        
toDisallow.stream().sorted().collect(Collectors.toList()), what, disallowed), 
state);
 
         Set<T> ignored = ignoredValues.apply(state);
         Set<T> toIgnore = Sets.intersection(values, ignored);
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 1d9603e..0ff3e46 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -25,10 +25,16 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.FSWriteError;
@@ -40,10 +46,13 @@ import 
org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 /**
@@ -386,4 +395,43 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                            Collection<SSTableFlushObserver> 
observers,
                                            LifecycleNewTracker 
lifecycleNewTracker);
     }
+
+    public static void guardCollectionSize(TableMetadata metadata, 
DecoratedKey partitionKey, Unfiltered unfiltered)
+    {
+        if (!Guardrails.collectionSize.enabled(null) && 
!Guardrails.itemsPerCollection.enabled(null))
+            return;
+
+        if (!unfiltered.isRow() || 
SchemaConstants.isSystemKeyspace(metadata.keyspace))
+            return;
+
+        Row row = (Row) unfiltered;
+        for (ColumnMetadata column : row.columns())
+        {
+            if (!column.type.isCollection() || !column.type.isMultiCell())
+                continue;
+
+            ComplexColumnData cells = row.getComplexColumnData(column);
+            if (cells == null)
+                continue;
+
+            ComplexColumnData liveCells = 
cells.purge(DeletionPurger.PURGE_ALL, FBUtilities.nowInSeconds());
+            if (liveCells == null)
+                continue;
+
+            int cellsSize = liveCells.dataSize();
+            int cellsCount = liveCells.cellsCount();
+
+            if (!Guardrails.collectionSize.triggersOn(cellsSize, null) &&
+                !Guardrails.itemsPerCollection.triggersOn(cellsCount, null))
+                continue;
+
+            String keyString = 
metadata.primaryKeyAsCQLLiteral(partitionKey.getKey(), row.clustering());
+            String msg = String.format("%s in row %s in table %s",
+                                       column.name.toString(),
+                                       keyString,
+                                       metadata);
+            Guardrails.collectionSize.guard(cellsSize, msg, null);
+            Guardrails.itemsPerCollection.guard(cellsCount, msg, null);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java 
b/src/java/org/apache/cassandra/schema/TableMetadata.java
index ef43c5d..5a226ec 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.github.jamm.Unmetered;
 
@@ -1342,6 +1343,66 @@ public class TableMetadata implements SchemaElement
         }
     }
 
+    /**
+     * Returns a string representation of a partition in a CQL-friendly format.
+     *
+     * For non-composite types it returns the result of {@link 
org.apache.cassandra.cql3.CQL3Type#toCQLLiteral}
+     * applied to the partition key.
+     * For composite types it applies {@link 
org.apache.cassandra.cql3.CQL3Type#toCQLLiteral} to each subkey and
+     * combines the results into a tuple.
+     *
+     * @param partitionKey a partition key
+     * @return CQL-like string representation of a partition key
+     */
+    public String partitionKeyAsCQLLiteral(ByteBuffer partitionKey)
+    {
+        return primaryKeyAsCQLLiteral(partitionKey, Clustering.EMPTY);
+    }
+
+    /**
+     * Returns a string representation of a primary key in a CQL-friendly 
format.
+     *
+     * @param partitionKey the partition key part of the primary key
+     * @param clustering the clustering key part of the primary key
+     * @return a CQL-like string representation of the specified primary key
+     */
+    public String primaryKeyAsCQLLiteral(ByteBuffer partitionKey, 
Clustering<?> clustering)
+    {
+        int clusteringSize = clustering.size();
+
+        String[] literals;
+        int i = 0;
+
+        if (partitionKeyType instanceof CompositeType)
+        {
+            List<AbstractType<?>> components = 
partitionKeyType.getComponents();
+            int size = components.size();
+            literals = new String[size + clusteringSize];
+            ByteBuffer[] values = ((CompositeType) 
partitionKeyType).split(partitionKey);
+            for (i = 0; i < size; i++)
+            {
+                literals[i] = asCQLLiteral(components.get(i), values[i]);
+            }
+        }
+        else
+        {
+            literals = new String[1 + clusteringSize];
+            literals[i++] = asCQLLiteral(partitionKeyType, partitionKey);
+        }
+
+        for (int j = 0; j < clusteringSize; j++)
+        {
+            literals[i++] = asCQLLiteral(clusteringColumns().get(j).type, 
clustering.bufferAt(j));
+        }
+
+        return i == 1 ? literals[0] : "(" + String.join(", ", literals) + ")";
+    }
+
+    private static String asCQLLiteral(AbstractType<?> type, ByteBuffer value)
+    {
+        return type.asCQL3Type().toCQLLiteral(value, ProtocolVersion.CURRENT);
+    }
+
     public static class CompactTableMetadata extends TableMetadata
     {
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailCollectionSizeOnSSTableWriteTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailCollectionSizeOnSSTableWriteTest.java
new file mode 100644
index 0000000..e469bc3
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailCollectionSizeOnSSTableWriteTest.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+
+import static java.nio.ByteBuffer.allocate;
+
+/**
+ * Tests the guardrail for the size of collections, {@link 
Guardrails#collectionSize}.
+ * <p>
+ * This test only includes the activation of the guardrail during sstable 
writes, all other cases are covered by
+ * {@link org.apache.cassandra.db.guardrails.GuardrailCollectionSizeTest}.
+ */
+public class GuardrailCollectionSizeOnSSTableWriteTest extends GuardrailTester
+{
+    private static final int NUM_NODES = 2;
+
+    private static final int WARN_THRESHOLD = 1024;
+    private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4;
+
+    private static Cluster cluster;
+    private static com.datastax.driver.core.Cluster driverCluster;
+    private static Session driverSession;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = init(Cluster.build(NUM_NODES)
+                              .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NATIVE_PROTOCOL)
+                                                .set("guardrails_enabled", 
true)
+                                                
.set("collection_size_warn_threshold", WARN_THRESHOLD + "B")
+                                                
.set("collection_size_fail_threshold", FAIL_THRESHOLD + "B"))
+                              .start());
+        cluster.disableAutoCompaction(KEYSPACE);
+        driverCluster = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+        driverSession = driverCluster.connect();
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (driverSession != null)
+            driverSession.close();
+
+        if (driverCluster != null)
+            driverCluster.close();
+
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Override
+    protected Cluster getCluster()
+    {
+        return cluster;
+    }
+
+    @Test
+    public void testSetSize() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", set());
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
+        execute("INSERT INTO %s (k, v) VALUES (3, ?)", 
set(allocate(WARN_THRESHOLD / 2)));
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", 
set(allocate(WARN_THRESHOLD)));
+        assertWarnedOnFlush(warnMessage("4"));
+
+        execute("INSERT INTO %s (k, v) VALUES (5, ?)", 
set(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD * 3 / 4)));
+        assertWarnedOnFlush(warnMessage("5"));
+
+        execute("INSERT INTO %s (k, v) VALUES (6, ?)", 
set(allocate(FAIL_THRESHOLD)));
+        assertFailedOnFlush(failMessage("6"));
+
+        execute("INSERT INTO %s (k, v) VALUES (7, ?)", 
set(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD * 3 / 4)));
+        assertFailedOnFlush(failMessage("7"));
+    }
+
+    @Test
+    public void testSetSizeFrozen()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v 
frozen<set<blob>>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", set());
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", 
set(allocate(WARN_THRESHOLD)));
+        execute("INSERT INTO %s (k, v) VALUES (5, ?)", 
set(allocate(FAIL_THRESHOLD)));
+
+        // frozen collections size is not checked during sstable write
+        assertNotWarnedOnFlush();
+    }
+
+    @Test
+    public void testSetSizeWithUpdates()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", set(allocate(1)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 0", set(allocate(1)));
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", 
set(allocate(WARN_THRESHOLD / 4)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 1", 
set(allocate(WARN_THRESHOLD * 3 / 4)));
+        assertWarnedOnFlush(warnMessage("1"));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", 
set(allocate(FAIL_THRESHOLD / 4)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 2", 
set(allocate(FAIL_THRESHOLD * 3 / 4)));
+        assertFailedOnFlush(failMessage("2"));
+
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", 
set(allocate(FAIL_THRESHOLD)));
+        execute("UPDATE %s SET v = v - ? WHERE k = 4", 
set(allocate(FAIL_THRESHOLD)));
+        assertNotWarnedOnFlush();
+    }
+
+    @Test
+    public void testSetSizeAfterCompaction() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", set(allocate(1)));
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + ? WHERE k = 0", set(allocate(1)));
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", 
set(allocate(WARN_THRESHOLD * 3 / 4)));
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + ? WHERE k = 1", 
set(allocate(WARN_THRESHOLD / 4)));
+        assertNotWarnedOnFlush();
+        assertWarnedOnCompact(warnMessage("1"));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", 
set(allocate(FAIL_THRESHOLD * 3 / 4)));
+        assertWarnedOnFlush(warnMessage("2"));
+        execute("UPDATE %s SET v = v + ? WHERE k = 2", 
set(allocate(FAIL_THRESHOLD / 4)));
+        assertWarnedOnFlush(warnMessage("2"));
+        assertFailedOnCompact(failMessage("2"));
+
+        execute("INSERT INTO %s (k, v) VALUES (3, ?)", 
set(allocate(FAIL_THRESHOLD)));
+        assertFailedOnFlush(failMessage("3"));
+        execute("UPDATE %s SET v = v - ? WHERE k = 3", 
set(allocate(FAIL_THRESHOLD)));
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+    }
+
+    @Test
+    public void testListSize() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", list());
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
+        execute("INSERT INTO %s (k, v) VALUES (3, ?)", 
list(allocate(WARN_THRESHOLD / 2)));
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", 
list(allocate(WARN_THRESHOLD)));
+        assertWarnedOnFlush(warnMessage("4"));
+
+        execute("INSERT INTO %s (k, v) VALUES (5, ?)", 
list(allocate(WARN_THRESHOLD / 2), allocate(WARN_THRESHOLD / 2)));
+        assertWarnedOnFlush(warnMessage("5"));
+
+        execute("INSERT INTO %s (k, v) VALUES (6, ?)", 
list(allocate(FAIL_THRESHOLD)));
+        assertFailedOnFlush(failMessage("6"));
+
+        execute("INSERT INTO %s (k, v) VALUES (7, ?)", 
list(allocate(FAIL_THRESHOLD / 2), allocate(FAIL_THRESHOLD / 2)));
+        assertFailedOnFlush(failMessage("7"));
+    }
+
+    @Test
+    public void testListSizeFrozen()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v 
frozen<list<blob>>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", list());
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", 
list(allocate(WARN_THRESHOLD)));
+        execute("INSERT INTO %s (k, v) VALUES (5, ?)", 
list(allocate(FAIL_THRESHOLD)));
+
+        // frozen collections size is not checked during sstable write
+        assertNotWarnedOnFlush();
+    }
+
+    @Test
+    public void testListSizeWithUpdates()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", list(allocate(1)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 0", list(allocate(1)));
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", 
list(allocate(WARN_THRESHOLD / 2)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 1", 
list(allocate(WARN_THRESHOLD / 2)));
+        assertWarnedOnFlush(warnMessage("1"));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", 
list(allocate(FAIL_THRESHOLD / 2)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 2", 
list(allocate(FAIL_THRESHOLD / 2)));
+        assertFailedOnFlush(failMessage("2"));
+
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", 
list(allocate(FAIL_THRESHOLD)));
+        execute("UPDATE %s SET v = v - ? WHERE k = 4", 
list(allocate(FAIL_THRESHOLD)));
+        assertNotWarnedOnFlush();
+    }
+
+    @Test
+    public void testListSizeAfterCompaction() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", list(allocate(1)));
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + ? WHERE k = 0", list(allocate(1)));
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", 
list(allocate(WARN_THRESHOLD / 2)));
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + ? WHERE k = 1", 
list(allocate(WARN_THRESHOLD / 2)));
+        assertNotWarnedOnFlush();
+        assertWarnedOnCompact(warnMessage("1"));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", 
list(allocate(FAIL_THRESHOLD / 2)));
+        assertWarnedOnFlush(warnMessage("2"));
+        execute("UPDATE %s SET v = v + ? WHERE k = 2", 
list(allocate(FAIL_THRESHOLD / 2)));
+        assertNotWarnedOnFlush();
+        assertFailedOnCompact(failMessage("2"));
+
+        execute("INSERT INTO %s (k, v) VALUES (3, ?)", 
list(allocate(FAIL_THRESHOLD)));
+        assertFailedOnFlush(failMessage("3"));
+        execute("UPDATE %s SET v[0] = ? WHERE k = 3", allocate(1));
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+    }
+
+    @Test
+    public void testMapSize() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", map());
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1), 
allocate(1)));
+        execute("INSERT INTO %s (k, v) VALUES (3, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD / 2)));
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", 
map(allocate(WARN_THRESHOLD / 2), allocate(1)));
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (5, ?)", 
map(allocate(WARN_THRESHOLD), allocate(1)));
+        assertWarnedOnFlush(warnMessage("5"));
+
+        execute("INSERT INTO %s (k, v) VALUES (6, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD)));
+        assertWarnedOnFlush(warnMessage("6"));
+
+        execute("INSERT INTO %s (k, v) VALUES (7, ?)", 
map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
+        assertWarnedOnFlush(warnMessage("7"));
+
+        execute("INSERT INTO %s (k, v) VALUES (8, ?)", 
map(allocate(FAIL_THRESHOLD), allocate(1)));
+        assertFailedOnFlush(failMessage("8"));
+
+        execute("INSERT INTO %s (k, v) VALUES (9, ?)", map(allocate(1), 
allocate(FAIL_THRESHOLD)));
+        assertFailedOnFlush(failMessage("9"));
+
+        execute("INSERT INTO %s (k, v) VALUES (10, ?)", 
map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
+        assertFailedOnFlush(failMessage("10"));
+    }
+
+    @Test
+    public void testMapSizeFrozen()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<blob, 
blob>>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", map());
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1), 
allocate(1)));
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD)));
+        execute("INSERT INTO %s (k, v) VALUES (5, ?)", 
map(allocate(WARN_THRESHOLD), allocate(1)));
+        execute("INSERT INTO %s (k, v) VALUES (6, ?)", 
map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
+        execute("INSERT INTO %s (k, v) VALUES (7, ?)", map(allocate(1), 
allocate(FAIL_THRESHOLD)));
+        execute("INSERT INTO %s (k, v) VALUES (8, ?)", 
map(allocate(FAIL_THRESHOLD), allocate(1)));
+        execute("INSERT INTO %s (k, v) VALUES (9, ?)", 
map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
+
+        // frozen collections size is not checked during sstable write
+        assertNotWarnedOnFlush();
+    }
+
+    @Test
+    public void testMapSizeWithUpdates()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", map(allocate(1), 
allocate(1)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 0", map(allocate(1), 
allocate(1)));
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD / 2)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 1", map(allocate(2), 
allocate(WARN_THRESHOLD / 2)));
+        assertWarnedOnFlush(warnMessage("1"));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", 
map(allocate(WARN_THRESHOLD / 4), allocate(1)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 2", 
map(allocate(WARN_THRESHOLD * 3 / 4), allocate(1)));
+        assertWarnedOnFlush(warnMessage("2"));
+
+        execute("INSERT INTO %s (k, v) VALUES (3, ?)", 
map(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD / 4)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 3", 
map(allocate(WARN_THRESHOLD / 4 + 1), allocate(WARN_THRESHOLD / 4)));
+        assertWarnedOnFlush(warnMessage("3"));
+
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1), 
allocate(FAIL_THRESHOLD / 2)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 4", map(allocate(2), 
allocate(FAIL_THRESHOLD / 2)));
+        assertFailedOnFlush(failMessage("4"));
+
+        execute("INSERT INTO %s (k, v) VALUES (5, ?)", 
map(allocate(FAIL_THRESHOLD / 4), allocate(1)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 5", 
map(allocate(FAIL_THRESHOLD * 3 / 4), allocate(1)));
+        assertFailedOnFlush(failMessage("5"));
+
+        execute("INSERT INTO %s (k, v) VALUES (6, ?)", 
map(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD / 4)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 6", 
map(allocate(FAIL_THRESHOLD / 4 + 1), allocate(FAIL_THRESHOLD / 4)));
+        assertFailedOnFlush(failMessage("6"));
+    }
+
+    @Test
+    public void testMapSizeAfterCompaction()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", map(allocate(1), 
allocate(1)));
+        execute("UPDATE %s SET v = v + ? WHERE k = 0", map(allocate(1), 
allocate(1)));
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD / 2)));
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + ? WHERE k = 1", map(allocate(2), 
allocate(WARN_THRESHOLD / 2)));
+        assertNotWarnedOnFlush();
+        assertWarnedOnCompact(warnMessage("1"));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, ?)", 
map(allocate(WARN_THRESHOLD / 4), allocate(1)));
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + ? WHERE k = 2", 
map(allocate(WARN_THRESHOLD * 3 / 4), allocate(1)));
+        assertNotWarnedOnFlush();
+        assertWarnedOnCompact(warnMessage("2"));
+
+        execute("INSERT INTO %s (k, v) VALUES (3, ?)", 
map(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD / 4)));
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + ? WHERE k = 3", 
map(allocate(WARN_THRESHOLD / 4 + 1), allocate(WARN_THRESHOLD / 4)));
+        assertNotWarnedOnFlush();
+        assertWarnedOnCompact(warnMessage("3"));
+
+        execute("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1), 
allocate(FAIL_THRESHOLD / 2)));
+        assertWarnedOnFlush(failMessage("4"));
+        execute("UPDATE %s SET v = v + ? WHERE k = 4", map(allocate(2), 
allocate(FAIL_THRESHOLD / 2)));
+        assertWarnedOnFlush(warnMessage("4"));
+        assertFailedOnCompact(failMessage("4"));
+
+        execute("INSERT INTO %s (k, v) VALUES (5, ?)", 
map(allocate(FAIL_THRESHOLD / 4), allocate(1)));
+        assertWarnedOnFlush(failMessage("5"));
+        execute("UPDATE %s SET v = v + ? WHERE k = 5", 
map(allocate(FAIL_THRESHOLD * 3 / 4), allocate(1)));
+        assertWarnedOnFlush(warnMessage("5"));
+        assertFailedOnCompact(failMessage("5"));
+
+        execute("INSERT INTO %s (k, v) VALUES (6, ?)", 
map(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD / 4)));
+        assertWarnedOnFlush(failMessage("6"));
+        execute("UPDATE %s SET v = v + ? WHERE k = 6", 
map(allocate(FAIL_THRESHOLD / 4 + 1), allocate(FAIL_THRESHOLD / 4)));
+        assertWarnedOnFlush(warnMessage("6"));
+        assertFailedOnCompact(failMessage("6"));
+    }
+
+    @Test
+    public void testCompositePartitionKey()
+    {
+        schemaChange("CREATE TABLE %s (k1 int, k2 text, v set<blob>, PRIMARY 
KEY((k1, k2)))");
+
+        execute("INSERT INTO %s (k1, k2, v) VALUES (0, 'a', ?)", 
set(allocate(WARN_THRESHOLD)));
+        assertWarnedOnFlush(warnMessage("(0, 'a')"));
+
+        execute("INSERT INTO %s (k1, k2, v) VALUES (1, 'b', ?)", 
set(allocate(FAIL_THRESHOLD)));
+        assertFailedOnFlush(failMessage("(1, 'b')"));
+    }
+
+    @Test
+    public void testCompositeClusteringKey()
+    {
+        schemaChange("CREATE TABLE %s (k int, c1 int, c2 text, v set<blob>, 
PRIMARY KEY(k, c1, c2))");
+
+        execute("INSERT INTO %s (k, c1, c2, v) VALUES (1, 10, 'a', ?)", 
set(allocate(WARN_THRESHOLD)));
+        assertWarnedOnFlush(warnMessage("(1, 10, 'a')"));
+
+        execute("INSERT INTO %s (k, c1, c2, v) VALUES (2, 20, 'b', ?)", 
set(allocate(FAIL_THRESHOLD)));
+        assertFailedOnFlush(failMessage("(2, 20, 'b')"));
+    }
+
+    private void execute(String query, Object... args)
+    {
+        SimpleStatement stmt = new SimpleStatement(format(query), args);
+        
stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL);
+        driverSession.execute(stmt);
+    }
+
+    private String warnMessage(String key)
+    {
+        return String.format("Detected collection v in row %s in table %s of 
size", key, qualifiedTableName);
+    }
+
+    private String failMessage(String key)
+    {
+        return String.format("Detected collection v in row %s in table %s of 
size", key, qualifiedTableName);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailItemsPerCollectionOnSSTableWriteTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailItemsPerCollectionOnSSTableWriteTest.java
new file mode 100644
index 0000000..742fa62
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailItemsPerCollectionOnSSTableWriteTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+/**
+ * Tests the guardrail for the number of items on a collection, {@link 
Guardrails#itemsPerCollection}.
+ * <p>
+ * This test only includes the activation of the guardrail during sstable 
writes, all other cases are covered by
+ * {@link org.apache.cassandra.db.guardrails.GuardrailItemsPerCollectionTest}.
+ */
+public class GuardrailItemsPerCollectionOnSSTableWriteTest extends 
GuardrailTester
+{
+    private static final int NUM_NODES = 2;
+
+    private static final int WARN_THRESHOLD = 2;
+    private static final int FAIL_THRESHOLD = 4;
+
+    private static Cluster cluster;
+    private static ICoordinator coordinator;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = init(Cluster.build(NUM_NODES)
+                              .withConfig(c -> c.set("guardrails_enabled", 
true)
+                                                
.set("items_per_collection_warn_threshold", WARN_THRESHOLD)
+                                                
.set("items_per_collection_fail_threshold", FAIL_THRESHOLD))
+                              .start());
+        cluster.disableAutoCompaction(KEYSPACE);
+        coordinator = cluster.coordinator(1);
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Override
+    protected Cluster getCluster()
+    {
+        return cluster;
+    }
+
+    @Test
+    public void testSetSize() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, {1})");
+        execute("INSERT INTO %s (k, v) VALUES (2, {1, 2})");
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (3, {1, 2, 3})");
+        execute("INSERT INTO %s (k, v) VALUES (4, {1, 2, 3, 4})");
+        assertWarnedOnFlush(warnMessage("3", 3), warnMessage("4", 4));
+
+        execute("INSERT INTO %s (k, v) VALUES (5, {1, 2, 3, 4, 5})");
+        execute("INSERT INTO %s (k, v) VALUES (6, {1, 2, 3, 4, 5, 6})");
+        assertFailedOnFlush(failMessage("5", 5), failMessage("6", 6));
+    }
+
+    @Test
+    public void testSetSizeFrozen()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v 
frozen<set<int>>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (3, {1, 2, 3})");
+        execute("INSERT INTO %s (k, v) VALUES (5, {1, 2, 3, 4, 5})");
+
+        // the size of frozen collections is not checked during sstable write
+        assertNotWarnedOnFlush();
+    }
+
+    @Test
+    public void testSetSizeWithUpdates()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+        execute("UPDATE %s SET v = v + {1, 2} WHERE k = 1");
+        execute("UPDATE %s SET v = v - {1, 2} WHERE k = 2");
+        assertNotWarnedOnFlush();
+
+        execute("UPDATE %s SET v = v + {1, 2, 3} WHERE k = 3");
+        execute("UPDATE %s SET v = v - {1, 2, 3} WHERE k = 4");
+        assertWarnedOnFlush(warnMessage("3", 3));
+
+        execute("UPDATE %s SET v = v + {1, 2, 3, 4, 5} WHERE k = 5");
+        execute("UPDATE %s SET v = v - {1, 2, 3, 4, 5} WHERE k = 6");
+        assertFailedOnFlush(failMessage("5", 5));
+    }
+
+    @Test
+    public void testSetSizeAfterCompaction() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, {1})");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + {2} WHERE k = 0");
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, {1})");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + {2, 3} WHERE k = 1");
+        assertNotWarnedOnFlush();
+        assertWarnedOnCompact(warnMessage("1", 3));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, {1, 2})");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + {3, 4, 5} WHERE k = 2");
+        assertWarnedOnFlush(warnMessage("2", 3));
+        assertFailedOnCompact(failMessage("2", 5));
+    }
+
+    @Test
+    public void testListSize() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, [1])");
+        execute("INSERT INTO %s (k, v) VALUES (2, [1, 2])");
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (3, [1, 2, 3])");
+        execute("INSERT INTO %s (k, v) VALUES (4, [1, 2, 3, 4])");
+        assertWarnedOnFlush(warnMessage("3", 3), warnMessage("4", 4));
+
+        execute("INSERT INTO %s (k, v) VALUES (5, [1, 2, 3, 4, 5])");
+        execute("INSERT INTO %s (k, v) VALUES (6, [1, 2, 3, 4, 5, 6])");
+        assertFailedOnFlush(failMessage("5", 5), failMessage("6", 6));
+    }
+
+    @Test
+    public void testListSizeFrozen() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v 
frozen<list<int>>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (3, [1, 2, 3])");
+        execute("INSERT INTO %s (k, v) VALUES (5, [1, 2, 3, 4, 5])");
+
+        // the size of frozen collections is not checked during sstable write
+        assertNotWarnedOnFlush();
+    }
+
+    @Test
+    public void testListSizeWithUpdates()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+        execute("UPDATE %s SET v = v + [1, 2] WHERE k = 1");
+        execute("UPDATE %s SET v = v - [1, 2] WHERE k = 2");
+        assertNotWarnedOnFlush();
+
+        execute("UPDATE %s SET v = v + [1, 2, 3] WHERE k = 3");
+        execute("UPDATE %s SET v = v - [1, 2, 3] WHERE k = 4");
+        assertWarnedOnFlush(warnMessage("3", 3));
+
+        execute("UPDATE %s SET v = v + [1, 2, 3, 4, 5] WHERE k = 5");
+        execute("UPDATE %s SET v = v - [1, 2, 3, 4, 5] WHERE k = 6");
+        assertFailedOnFlush(failMessage("5", 5));
+    }
+
+    @Test
+    public void testListSizeAfterCompaction() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, [1])");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + [2] WHERE k = 0");
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, [1])");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + [2, 3] WHERE k = 1");
+        assertNotWarnedOnFlush();
+        assertWarnedOnCompact(warnMessage("1", 3));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, [1, 2])");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + [3, 4, 5] WHERE k = 2");
+        assertWarnedOnFlush(warnMessage("2", 3));
+        assertFailedOnCompact(failMessage("2", 5));
+
+        execute("INSERT INTO %s (k, v) VALUES (3, [1, 2, 3])");
+        assertWarnedOnFlush(warnMessage("3", 3));
+        execute("UPDATE %s SET v[1] = null WHERE k = 3");
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+    }
+
+    @Test
+    public void testMapSize() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, null)");
+        execute("INSERT INTO %s (k, v) VALUES (1, {1:10})");
+        execute("INSERT INTO %s (k, v) VALUES (2, {1:10, 2:20})");
+        assertNotWarnedOnFlush();
+
+        execute("INSERT INTO %s (k, v) VALUES (3, {1:10, 2:20, 3:30})");
+        execute("INSERT INTO %s (k, v) VALUES (4, {1:10, 2:20, 3:30, 4:40})");
+        assertWarnedOnFlush(warnMessage("3", 3), warnMessage("4", 4));
+
+        execute("INSERT INTO %s (k, v) VALUES (5, {1:10, 2:20, 3:30, 4:40, 
5:50})");
+        execute("INSERT INTO %s (k, v) VALUES (6, {1:10, 2:20, 3:30, 4:40, 
5:50, 6:60})");
+        assertFailedOnFlush(failMessage("5", 5), failMessage("6", 6));
+    }
+
+    @Test
+    public void testMapSizeFrozen()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<int, 
int>>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (3, {1:10, 2:20, 3:30})");
+        execute("INSERT INTO %s (k, v) VALUES (4, {1:10, 2:20, 3:30, 4:40})");
+
+        // the size of frozen collections is not checked during sstable write
+        assertNotWarnedOnFlush();
+    }
+
+    @Test
+    public void testMapSizeWithUpdates()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+        execute("UPDATE %s SET v = v + {1:10, 2:20} WHERE k = 1");
+        execute("UPDATE %s SET v = v - {1, 2} WHERE k = 2");
+        assertNotWarnedOnFlush();
+
+        execute("UPDATE %s SET v = v + {1:10, 2:20, 3:30} WHERE k = 3");
+        execute("UPDATE %s SET v = v - {1, 2, 3} WHERE k = 4");
+        assertWarnedOnFlush(warnMessage("3", 3));
+
+        execute("UPDATE %s SET v = v + {1:10, 2:20, 3:30, 4:40, 5:50} WHERE k 
= 5");
+        execute("UPDATE %s SET v = v - {1, 2, 3, 4, 5} WHERE k = 6");
+        assertFailedOnFlush(failMessage("5", 5));
+    }
+
+    @Test
+    public void testMapSizeAfterCompaction()
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+        execute("INSERT INTO %s (k, v) VALUES (0, {1:10})");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + {2:20} WHERE k = 0");
+        assertNotWarnedOnFlush();
+        assertNotWarnedOnCompact();
+
+        execute("INSERT INTO %s (k, v) VALUES (1, {1:10})");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + {2:20, 3:30} WHERE k = 1");
+        assertNotWarnedOnFlush();
+        assertWarnedOnCompact(warnMessage("1", 3));
+
+        execute("INSERT INTO %s (k, v) VALUES (2, {1:10, 2:20})");
+        assertNotWarnedOnFlush();
+        execute("UPDATE %s SET v = v + {3:30, 4:40, 5:50} WHERE k = 2");
+        assertWarnedOnFlush(warnMessage("2", 3));
+        assertFailedOnCompact(failMessage("2", 5));
+    }
+
+    @Test
+    public void testCompositePartitionKey()
+    {
+        schemaChange("CREATE TABLE %s (k1 int, k2 text, v set<int>, PRIMARY 
KEY((k1, k2)))");
+
+        execute("INSERT INTO %s (k1, k2, v) VALUES (0, 'a', {1, 2, 3})");
+        assertWarnedOnFlush(warnMessage("(0, 'a')", 3));
+
+        execute("INSERT INTO %s (k1, k2, v) VALUES (1, 'b', {1, 2, 3, 4, 5})");
+        assertFailedOnFlush(failMessage("(1, 'b')", 5));
+    }
+
+    @Test
+    public void testCompositeClusteringKey()
+    {
+        schemaChange("CREATE TABLE %s (k int, c1 int, c2 text, v set<int>, 
PRIMARY KEY(k, c1, c2))");
+
+        execute("INSERT INTO %s (k, c1, c2, v) VALUES (1, 10, 'a', {1, 2, 
3})");
+        assertWarnedOnFlush(warnMessage("(1, 10, 'a')", 3));
+
+        execute("INSERT INTO %s (k, c1, c2, v) VALUES (2, 20, 'b', {1, 2, 3, 
4, 5})");
+        assertFailedOnFlush(failMessage("(2, 20, 'b')", 5));
+    }
+
+    private void execute(String query)
+    {
+        coordinator.execute(format(query), ConsistencyLevel.ALL);
+    }
+
+    private String warnMessage(String key, int numItems)
+    {
+        return String.format("Detected collection v in row %s in table %s with 
%d items, " +
+                             "this exceeds the warning threshold of %d.",
+                             key, qualifiedTableName, numItems, 
WARN_THRESHOLD);
+    }
+
+    private String failMessage(String key, int numItems)
+    {
+        return String.format("Detected collection v in row %s in table %s with 
%d items, " +
+                             "this exceeds the failure threshold of %d.",
+                             key, qualifiedTableName, numItems, 
FAIL_THRESHOLD);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
new file mode 100644
index 0000000..b12d9ab
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ListAssert;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public abstract class GuardrailTester extends TestBaseImpl
+{
+    private static final AtomicInteger seqNumber = new AtomicInteger();
+    protected String tableName, qualifiedTableName;
+
+    protected abstract Cluster getCluster();
+
+    @Before
+    public void beforeTest()
+    {
+        tableName = "t_" + seqNumber.getAndIncrement();
+        qualifiedTableName = KEYSPACE + "." + tableName;
+    }
+
+    @After
+    public void afterTest()
+    {
+        schemaChange("DROP TABLE IF EXISTS %s");
+    }
+
+    protected String format(String query)
+    {
+        return String.format(query, qualifiedTableName);
+    }
+
+    protected void schemaChange(String query)
+    {
+        getCluster().schemaChange(format(query));
+    }
+
+    protected void assertNotWarnedOnFlush()
+    {
+        assertNotWarnsOnSSTableWrite(false);
+    }
+
+    protected void assertNotWarnedOnCompact()
+    {
+        assertNotWarnsOnSSTableWrite(true);
+    }
+
+    protected void assertNotWarnsOnSSTableWrite(boolean compact)
+    {
+        getCluster().stream().forEach(node -> 
assertNotWarnsOnSSTableWrite(node, compact));
+    }
+
+    protected void assertNotWarnsOnSSTableWrite(IInstance node, boolean 
compact)
+    {
+        long mark = node.logs().mark();
+        try
+        {
+            writeSSTables(node, compact);
+            assertTrue(node.logs().grep(mark, "^ERROR", 
"^WARN").getResult().isEmpty());
+        }
+        catch (InvalidRequestException e)
+        {
+            fail("Expected not to fail, but Fails with error message: " + 
e.getMessage());
+        }
+    }
+
+    protected void assertWarnedOnFlush(String... msgs)
+    {
+        assertWarnsOnSSTableWrite(false, msgs);
+    }
+
+    protected void assertWarnedOnCompact(String... msgs)
+    {
+        assertWarnsOnSSTableWrite(true, msgs);
+    }
+
+    protected void assertWarnsOnSSTableWrite(boolean compact, String... msgs)
+    {
+        getCluster().stream().forEach(node -> assertWarnsOnSSTableWrite(node, 
compact, msgs));
+    }
+
+    protected void assertWarnsOnSSTableWrite(IInstance node, boolean compact, 
String... msgs)
+    {
+        long mark = node.logs().mark();
+        writeSSTables(node, compact);
+        assertTrue(node.logs().grep(mark, "^ERROR").getResult().isEmpty());
+        List<String> warnings = node.logs().grep(mark, "^WARN").getResult();
+        ListAssert<String> assertion = 
Assertions.assertThat(warnings).isNotEmpty().hasSize(msgs.length);
+        for (String msg : msgs)
+            assertion.anyMatch(m -> m.contains(msg));
+    }
+
+    protected void assertFailedOnFlush(String... msgs)
+    {
+        assertFailsOnSSTableWrite(false, msgs);
+    }
+
+    protected void assertFailedOnCompact(String... msgs)
+    {
+        assertFailsOnSSTableWrite(true, msgs);
+    }
+
+    private void assertFailsOnSSTableWrite(boolean compact, String... msgs)
+    {
+        getCluster().stream().forEach(node -> assertFailsOnSSTableWrite(node, 
compact, msgs));
+    }
+
+    private void assertFailsOnSSTableWrite(IInstance node, boolean compact, 
String... msgs)
+    {
+        long mark = node.logs().mark();
+        writeSSTables(node, compact);
+        assertTrue(node.logs().grep(mark, "^WARN").getResult().isEmpty());
+        List<String> warnings = node.logs().grep(mark, "^ERROR").getResult();
+        ListAssert<String> assertion = 
Assertions.assertThat(warnings).isNotEmpty().hasSize(msgs.length);
+        for (String msg : msgs)
+            assertion.anyMatch(m -> m.contains(msg));
+    }
+
+    private void writeSSTables(IInstance node, boolean compact)
+    {
+        node.flush(KEYSPACE);
+        if (compact)
+            node.forceCompact(KEYSPACE, tableName);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
new file mode 100644
index 0000000..f867783
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+
+import static java.nio.ByteBuffer.allocate;
+
+/**
+ * Tests the guardrail for the size of collections, {@link 
Guardrails#collectionSize}.
+ * <p>
+ * This test doesn't include the activation of the guardrail during sstable 
writes, these cases are covered by the dtest
+ * {@link 
org.apache.cassandra.distributed.test.guardrails.GuardrailCollectionSizeOnSSTableWriteTest}.
+ */
+public class GuardrailCollectionSizeTest extends ThresholdTester
+{
+    private static final int WARN_THRESHOLD = 1024; // bytes
+    private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes
+
+    public GuardrailCollectionSizeTest()
+    {
+        super(WARN_THRESHOLD / 1024, // to KiB
+              FAIL_THRESHOLD / 1024, // to KiB
+              Guardrails.collectionSize,
+              Guardrails::setCollectionSizeThresholdInKiB,
+              Guardrails::getCollectionSizeWarnThresholdInKiB,
+              Guardrails::getCollectionSizeFailThresholdInKiB);
+    }
+
+    @Test
+    public void testSetSize() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", set());
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
+        assertValid("INSERT INTO %s (k, v) VALUES (3, ?)", 
set(allocate(WARN_THRESHOLD / 2)));
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)", 
set(allocate(WARN_THRESHOLD)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", 
set(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD * 3 / 4)));
+
+        assertFails("INSERT INTO %s (k, v) VALUES (6, ?)", 
set(allocate(FAIL_THRESHOLD)));
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", 
set(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD * 3 / 4)));
+    }
+
+    @Test
+    public void testSetSizeFrozen() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v 
frozen<set<blob>>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", set());
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(allocate(1)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)", 
set(allocate(WARN_THRESHOLD)));
+        assertFails("INSERT INTO %s (k, v) VALUES (5, ?)", 
set(allocate(FAIL_THRESHOLD)));
+    }
+
+    @Test
+    public void testSetSizeWithUpdates() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v set<blob>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", set(allocate(1)));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 0", set(allocate(1)));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", 
set(allocate(WARN_THRESHOLD / 4)));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 1", 
set(allocate(WARN_THRESHOLD * 3 / 4)));
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (2, ?)", 
set(allocate(FAIL_THRESHOLD / 4)));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 2", 
set(allocate(FAIL_THRESHOLD * 3 / 4)));
+    }
+
+    @Test
+    public void testListSize() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", list());
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
+        assertValid("INSERT INTO %s (k, v) VALUES (3, ?)", 
list(allocate(WARN_THRESHOLD / 2)));
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)", 
list(allocate(WARN_THRESHOLD)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", 
list(allocate(WARN_THRESHOLD / 2), allocate(WARN_THRESHOLD / 2)));
+
+        assertFails("INSERT INTO %s (k, v) VALUES (6, ?)", 
list(allocate(FAIL_THRESHOLD)));
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", 
list(allocate(FAIL_THRESHOLD / 2), allocate(FAIL_THRESHOLD / 2)));
+    }
+
+    @Test
+    public void testListSizeFrozen() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v 
frozen<list<blob>>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", list());
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(allocate(1)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)", 
list(allocate(WARN_THRESHOLD)));
+        assertFails("INSERT INTO %s (k, v) VALUES (5, ?)", 
list(allocate(FAIL_THRESHOLD)));
+    }
+
+    @Test
+    public void testListSizeWithUpdates() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v list<blob>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", list(allocate(1)));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 0", list(allocate(1)));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", 
list(allocate(WARN_THRESHOLD / 2)));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 1", 
list(allocate(WARN_THRESHOLD / 2)));
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (2, ?)", 
list(allocate(FAIL_THRESHOLD / 2)));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 2", 
list(allocate(FAIL_THRESHOLD / 2)));
+    }
+
+    @Test
+    public void testMapSize() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map());
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1), 
allocate(1)));
+        assertValid("INSERT INTO %s (k, v) VALUES (3, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD / 2)));
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", 
map(allocate(WARN_THRESHOLD / 2), allocate(1)));
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", 
map(allocate(WARN_THRESHOLD), allocate(1)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (7, ?)", 
map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
+
+        assertFails("INSERT INTO %s (k, v) VALUES (8, ?)", 
map(allocate(FAIL_THRESHOLD), allocate(1)));
+        assertFails("INSERT INTO %s (k, v) VALUES (9, ?)", map(allocate(1), 
allocate(FAIL_THRESHOLD)));
+        assertFails("INSERT INTO %s (k, v) VALUES (10, ?)", 
map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
+    }
+
+    @Test
+    public void testMapSizeFrozen() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<blob, 
blob>>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map());
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(allocate(1), 
allocate(1)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", 
map(allocate(WARN_THRESHOLD), allocate(1)));
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", 
map(allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)));
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", map(allocate(1), 
allocate(FAIL_THRESHOLD)));
+        assertFails("INSERT INTO %s (k, v) VALUES (8, ?)", 
map(allocate(FAIL_THRESHOLD), allocate(1)));
+        assertFails("INSERT INTO %s (k, v) VALUES (9, ?)", 
map(allocate(FAIL_THRESHOLD), allocate(FAIL_THRESHOLD)));
+    }
+
+    @Test
+    public void testMapSizeWithUpdates() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v map<blob, blob>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", map(allocate(1), 
allocate(1)));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 0", map(allocate(1), 
allocate(1)));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map(allocate(1), 
allocate(WARN_THRESHOLD / 2)));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 1", map(allocate(2), 
allocate(WARN_THRESHOLD / 2)));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", 
map(allocate(WARN_THRESHOLD / 4), allocate(1)));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 2", 
map(allocate(WARN_THRESHOLD * 3 / 4), allocate(1)));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (3, ?)", 
map(allocate(WARN_THRESHOLD / 4), allocate(WARN_THRESHOLD / 4)));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 3", 
map(allocate(WARN_THRESHOLD / 4 + 1), allocate(WARN_THRESHOLD / 4)));
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (4, ?)", map(allocate(1), 
allocate(FAIL_THRESHOLD / 2)));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 4", map(allocate(2), 
allocate(FAIL_THRESHOLD / 2)));
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", 
map(allocate(FAIL_THRESHOLD / 4), allocate(1)));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 5", 
map(allocate(FAIL_THRESHOLD * 3 / 4), allocate(1)));
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", 
map(allocate(FAIL_THRESHOLD / 4), allocate(FAIL_THRESHOLD / 4)));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 6", 
map(allocate(FAIL_THRESHOLD / 4 + 1), allocate(FAIL_THRESHOLD / 4)));
+    }
+
+    private void assertValid(String query, ByteBuffer... values) throws 
Throwable
+    {
+        assertValid(execute(query, values));
+    }
+
+    private void assertWarns(String query, ByteBuffer... values) throws 
Throwable
+    {
+        assertWarns(execute(query, values), "Detected collection v");
+    }
+
+    private void assertFails(String query, ByteBuffer... values) throws 
Throwable
+    {
+        assertFails(execute(query, values), "Detected collection v");
+    }
+
+    private CheckedFunction execute(String query, ByteBuffer... values)
+    {
+        return () -> execute(userClientState, query, Arrays.asList(values));
+    }
+
+    private static ByteBuffer set(ByteBuffer... values)
+    {
+        return SetType.getInstance(BytesType.instance, 
true).decompose(ImmutableSet.copyOf(values));
+    }
+
+    private static ByteBuffer list(ByteBuffer... values)
+    {
+        return ListType.getInstance(BytesType.instance, 
true).decompose(ImmutableList.copyOf(values));
+    }
+
+    private ByteBuffer map()
+    {
+        return MapType.getInstance(BytesType.instance, BytesType.instance, 
true).decompose(Collections.emptyMap());
+    }
+
+    private ByteBuffer map(ByteBuffer key, ByteBuffer value)
+    {
+        return MapType.getInstance(BytesType.instance, BytesType.instance, 
true).decompose(ImmutableMap.of(key, value));
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailItemsPerCollectionTest.java
 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailItemsPerCollectionTest.java
new file mode 100644
index 0000000..66424aa
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailItemsPerCollectionTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+
+/**
+ * Tests the guardrail for the number of items on a collection, {@link 
Guardrails#itemsPerCollection}.
+ * <p>
+ * This test doesn't include the activation of the guardrail during sstable 
writes, these cases are covered by the dtest
+ * {@link 
org.apache.cassandra.distributed.test.guardrails.GuardrailItemsPerCollectionOnSSTableWriteTest}.
+ */
+public class GuardrailItemsPerCollectionTest extends ThresholdTester
+{
+    private static final int WARN_THRESHOLD = 10;
+    private static final int FAIL_THRESHOLD = 20;
+
+    public GuardrailItemsPerCollectionTest()
+    {
+        super(WARN_THRESHOLD,
+              FAIL_THRESHOLD,
+              Guardrails.itemsPerCollection,
+              Guardrails::setItemsPerCollectionThreshold,
+              Guardrails::getItemsPerCollectionWarnThreshold,
+              Guardrails::getItemsPerCollectionFailThreshold);
+    }
+
+    @Test
+    public void testSetSize() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", set(0));
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(1));
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", 
set(WARN_THRESHOLD));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", set(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", 
set(FAIL_THRESHOLD), FAIL_THRESHOLD);
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", set(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+        assertFails("INSERT INTO %s (k, v) VALUES (8, ?)", set(FAIL_THRESHOLD 
+ 10), FAIL_THRESHOLD + 10);
+    }
+
+    @Test
+    public void testSetSizeFrozen() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v frozen<set<int>>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", set(0));
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(1));
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", 
set(WARN_THRESHOLD));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", set(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", 
set(FAIL_THRESHOLD), FAIL_THRESHOLD);
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", set(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+    }
+
+    @Test
+    public void testSetSizeWithUpdates() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v set<int>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", set(1));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 0", set(1, 
WARN_THRESHOLD));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 0", set(1, 
FAIL_THRESHOLD), FAIL_THRESHOLD - 1);
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (1, ?)", set(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertValid("UPDATE %s SET v = v - ? WHERE k = 1", set(1));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", set(1));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 2", set(1, 
WARN_THRESHOLD + 1));
+
+        assertFails("INSERT INTO %s (k, v) VALUES (3, ?)", set(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+        assertValid("UPDATE %s SET v = v - ? WHERE k = 3", set(1));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", set(1));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 4", set(1, 
FAIL_THRESHOLD + 1), FAIL_THRESHOLD);
+    }
+
+    @Test
+    public void testListSize() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", list(0));
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(1));
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", 
list(WARN_THRESHOLD));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", list(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", 
list(FAIL_THRESHOLD), FAIL_THRESHOLD);
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", list(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", list(FAIL_THRESHOLD 
+ 10), FAIL_THRESHOLD + 10);
+    }
+
+    @Test
+    public void testListSizeFrozen() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v 
frozen<list<int>>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", list(0));
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(1));
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", 
list(WARN_THRESHOLD));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", list(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", 
list(FAIL_THRESHOLD), FAIL_THRESHOLD);
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", list(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+    }
+
+    @Test
+    public void testListSizeWithUpdates() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v list<int>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", list(1));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 0", list(1, 
WARN_THRESHOLD));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 0", list(1, 
FAIL_THRESHOLD), FAIL_THRESHOLD - 1);
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (1, ?)", list(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertValid("UPDATE %s SET v = v - ? WHERE k = 1", list(1));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", list(1));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 2", list(1, 
WARN_THRESHOLD + 1));
+
+        assertFails("INSERT INTO %s (k, v) VALUES (3, ?)", list(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+        assertValid("UPDATE %s SET v = v - ? WHERE k = 3", list(1));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", list(1));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 4", list(1, 
FAIL_THRESHOLD + 1), FAIL_THRESHOLD);
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", set(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertValid("UPDATE %s SET v[0] = null WHERE k = 5");
+    }
+
+    @Test
+    public void testMapSize() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map(0));
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(1));
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", 
map(WARN_THRESHOLD));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", map(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", 
map(FAIL_THRESHOLD), FAIL_THRESHOLD);
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", map(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", map(FAIL_THRESHOLD 
+ 10), FAIL_THRESHOLD + 10);
+    }
+
+    @Test
+    public void testMapSizeFrozen() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v frozen<map<int, 
int>>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, null)");
+        assertValid("INSERT INTO %s (k, v) VALUES (1, ?)", map(0));
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(1));
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", 
map(WARN_THRESHOLD));
+        assertWarns("INSERT INTO %s (k, v) VALUES (5, ?)", map(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertWarns("INSERT INTO %s (k, v) VALUES (6, ?)", 
map(FAIL_THRESHOLD), FAIL_THRESHOLD);
+        assertFails("INSERT INTO %s (k, v) VALUES (7, ?)", map(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+    }
+
+    @Test
+    public void testMapSizeWithUpdates() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v map<int, int>)");
+
+        assertValid("INSERT INTO %s (k, v) VALUES (0, ?)", map(1));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 0", map(1, 
WARN_THRESHOLD));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 0", map(1, 
FAIL_THRESHOLD), FAIL_THRESHOLD - 1);
+
+        assertWarns("INSERT INTO %s (k, v) VALUES (1, ?)", map(WARN_THRESHOLD 
+ 1), WARN_THRESHOLD + 1);
+        assertValid("UPDATE %s SET v = v - ? WHERE k = 1", set(1));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (2, ?)", map(1));
+        assertValid("UPDATE %s SET v = v + ? WHERE k = 2", map(1, 
WARN_THRESHOLD + 1));
+
+        assertFails("INSERT INTO %s (k, v) VALUES (3, ?)", map(FAIL_THRESHOLD 
+ 1), FAIL_THRESHOLD + 1);
+        assertValid("UPDATE %s SET v = v - ? WHERE k = 3", set(1));
+
+        assertValid("INSERT INTO %s (k, v) VALUES (4, ?)", map(1));
+        assertWarns("UPDATE %s SET v = v + ? WHERE k = 4", map(1, 
FAIL_THRESHOLD + 1), FAIL_THRESHOLD);
+    }
+
+    private void assertValid(String query, ByteBuffer collection) throws 
Throwable
+    {
+        assertValid(execute(query, collection));
+    }
+
+    private void assertWarns(String query, ByteBuffer collection, int 
numItems) throws Throwable
+    {
+        assertWarns(execute(query, collection),
+                    String.format("Detected collection v with %d items, this 
exceeds the warning threshold of %d.",
+                                  numItems, WARN_THRESHOLD));
+    }
+
+    private void assertFails(String query, ByteBuffer collection, int 
numItems) throws Throwable
+    {
+        assertFails(execute(query, collection),
+                    String.format("Detected collection v with %d items, this 
exceeds the failure threshold of %d.",
+                                  numItems, FAIL_THRESHOLD));
+    }
+
+    private CheckedFunction execute(String query, ByteBuffer collection)
+    {
+        return () -> execute(userClientState, query, 
Collections.singletonList(collection));
+    }
+
+    private static ByteBuffer set(int numElements)
+    {
+        return set(0, numElements);
+    }
+
+    private static ByteBuffer set(int startInclusive, int endExclusive)
+    {
+        return SetType.getInstance(Int32Type.instance, true)
+                      .decompose(collection(startInclusive, endExclusive, 
Collectors.toSet()));
+    }
+
+    private static ByteBuffer list(int numElements)
+    {
+        return list(0, numElements);
+    }
+
+    private static ByteBuffer list(int startInclusive, int endExclusive)
+    {
+        return ListType.getInstance(Int32Type.instance, false)
+                       .decompose(collection(startInclusive, endExclusive, 
Collectors.toList()));
+    }
+
+    private static ByteBuffer map(int numElements)
+    {
+        return map(0, numElements);
+    }
+
+    private static ByteBuffer map(int startInclusive, int endExclusive)
+    {
+        return MapType.getInstance(Int32Type.instance, Int32Type.instance, 
true)
+                      .decompose(collection(startInclusive, endExclusive, 
Collectors.toMap(x -> x, x -> x)));
+    }
+
+    private static <R, A> R collection(int startInclusive, int endExclusive, 
Collector<Integer, A, R> collector)
+    {
+        return IntStream.range(startInclusive, 
endExclusive).boxed().collect(collector);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailPageSizeTest.java 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailPageSizeTest.java
index ba4c2d0..68122f2 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailPageSizeTest.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailPageSizeTest.java
@@ -144,10 +144,4 @@ public class GuardrailPageSizeTest extends ThresholdTester
 
         statement.executeLocally(queryState, options);
     }
-
-    //not used by page-size guardrail tests.
-    protected long currentValue()
-    {
-        throw new UnsupportedOperationException();
-    }
 }
diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
index c108560..70c4cfc 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
@@ -113,7 +113,7 @@ public class GuardrailsTest extends GuardrailTester
                                         state -> 10,
                                         state -> 100,
                                         (isWarn, what, v, t) -> format("%s: 
for %s, %s > %s",
-                                                                       isWarn 
? "Warning" : "Aborting", what, v, t));
+                                                                       isWarn 
? "Warning" : "Failure", what, v, t));
 
         // value under both thresholds
         assertValid(() -> guard.guard(5, "x", null));
@@ -127,9 +127,10 @@ public class GuardrailsTest extends GuardrailTester
         assertValid(() -> guard.guard(100, "y", systemClientState));
         assertValid(() -> guard.guard(100, "y", superClientState));
 
-        // value over fail threshold
-        assertFails(() -> guard.guard(101, "z", null), "Aborting: for z, 101 > 
100");
-        assertFails(() -> guard.guard(101, "z", userClientState), "Aborting: 
for z, 101 > 100");
+        // value over fail threshold. An undefined user means that the check 
comes from a background process,
+        // so we warn instead of failing to prevent interrupting that process.
+        assertWarns(() -> guard.guard(101, "z", null), "Failure: for z, 101 > 
100");
+        assertFails(() -> guard.guard(101, "z", userClientState), "Failure: 
for z, 101 > 100");
         assertValid(() -> guard.guard(101, "z", systemClientState));
         assertValid(() -> guard.guard(101, "z", superClientState));
     }
@@ -154,7 +155,6 @@ public class GuardrailsTest extends GuardrailTester
         assertValid(() -> enabled.ensureEnabled(superClientState));
 
         DisableFlag disabled = new DisableFlag("x", state -> true, "X");
-        assertFails(() -> disabled.ensureEnabled(null), "X is not allowed");
         assertFails(() -> disabled.ensureEnabled(userClientState), "X is not 
allowed");
         assertValid(() -> disabled.ensureEnabled(systemClientState));
         assertValid(() -> disabled.ensureEnabled(superClientState));
@@ -234,9 +234,9 @@ public class GuardrailsTest extends GuardrailTester
         assertValid(() -> disallowed.guard(set(200), action, userClientState));
         assertValid(() -> disallowed.guard(set(1, 2, 3), action, 
userClientState));
 
-        assertFails(() -> disallowed.guard(set(4, 6), action, null),
+        assertWarns(() -> disallowed.guard(set(4, 6), action, null),
                     "Provided values [4, 6] are not allowed for integer 
(disallowed values are: [4, 6, 20])");
-        assertFails(() -> disallowed.guard(set(4, 5, 6, 7), action, null),
+        assertWarns(() -> disallowed.guard(set(4, 5, 6, 7), action, null),
                     "Provided values [4, 6] are not allowed for integer 
(disallowed values are: [4, 6, 20])");
     }
 
@@ -271,7 +271,7 @@ public class GuardrailsTest extends GuardrailTester
         Assert.assertEquals(list(3, 3), triggeredOn);
 
         message = "Provided values [4] are not allowed for integer (disallowed 
values are: [4])";
-        assertFails(() -> disallowed.guard(set(4), action, null), message);
+        assertWarns(() -> disallowed.guard(set(4), action, null), message);
         assertFails(() -> disallowed.guard(set(4), action, userClientState), 
message);
         assertValid(() -> disallowed.guard(set(4), action, systemClientState));
         assertValid(() -> disallowed.guard(set(4), action, superClientState));
diff --git a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java 
b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
index a816ca9..a6ce3a1 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
@@ -26,6 +26,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.assertj.core.api.Assertions;
 
 import static java.lang.String.format;
@@ -43,7 +44,25 @@ public abstract class ThresholdTester extends GuardrailTester
     private final TriConsumer<Guardrails, Long, Long> setter;
     private final ToLongFunction<Guardrails> warnGetter;
     private final ToLongFunction<Guardrails> failGetter;
-    private final long maxValue = Integer.MAX_VALUE;
+    private final long maxValue;
+    private final long disabledValue;
+
+    protected ThresholdTester(int warnThreshold,
+                              int failThreshold,
+                              Threshold threshold,
+                              TriConsumer<Guardrails, Integer, Integer> setter,
+                              ToIntFunction<Guardrails> warnGetter,
+                              ToIntFunction<Guardrails> failGetter)
+    {
+        super(threshold);
+        this.warnThreshold = warnThreshold;
+        this.failThreshold = failThreshold;
+        this.setter = (g, w, a) -> setter.accept(g, w.intValue(), 
a.intValue());
+        this.warnGetter = g -> (long) warnGetter.applyAsInt(g);
+        this.failGetter = g -> (long) failGetter.applyAsInt(g);
+        maxValue = Integer.MAX_VALUE;
+        disabledValue = Config.DISABLED_GUARDRAIL;
+    }
 
     protected ThresholdTester(long warnThreshold,
                               long failThreshold,
@@ -58,25 +77,15 @@ public abstract class ThresholdTester extends 
GuardrailTester
         this.setter = setter;
         this.warnGetter = warnGetter;
         this.failGetter = failGetter;
+        maxValue = Long.MAX_VALUE;
+        disabledValue = Config.DISABLED_SIZE_GUARDRAIL.toBytes();
     }
 
-    protected ThresholdTester(int warnThreshold,
-                              int failThreshold,
-                              Threshold threshold,
-                              TriConsumer<Guardrails, Integer, Integer> setter,
-                              ToIntFunction<Guardrails> warnGetter,
-                              ToIntFunction<Guardrails> failGetter)
+    protected long currentValue()
     {
-        super(threshold);
-        this.warnThreshold = warnThreshold;
-        this.failThreshold = failThreshold;
-        this.setter = (g, w, a) -> setter.accept(g, w.intValue(), 
a.intValue());
-        this.warnGetter = g -> (long) warnGetter.applyAsInt(g);
-        this.failGetter = g -> (long) failGetter.applyAsInt(g);
+        throw new UnsupportedOperationException();
     }
 
-    protected abstract long currentValue();
-
     protected void assertCurrentValue(int count)
     {
         assertEquals(count, currentValue());
@@ -97,15 +106,14 @@ public abstract class ThresholdTester extends 
GuardrailTester
 
     protected void testValidationOfThresholdProperties(String warnName, String 
failName)
     {
-        setter.accept(guardrails(), -1L, -1L);
+        setter.accept(guardrails(), disabledValue, disabledValue);
 
-        testValidationOfStrictlyPositiveProperty((g, a) -> setter.accept(g, 
-1L, a), failName);
-        testValidationOfStrictlyPositiveProperty((g, w) -> setter.accept(g, w, 
-1L), warnName);
+        testValidationOfStrictlyPositiveProperty((g, a) -> setter.accept(g, 
disabledValue, a), failName);
+        testValidationOfStrictlyPositiveProperty((g, w) -> setter.accept(g, w, 
disabledValue), warnName);
 
-        setter.accept(guardrails(), -1L, -1L);
+        setter.accept(guardrails(), disabledValue, disabledValue);
         Assertions.assertThatThrownBy(() -> setter.accept(guardrails(), 2L, 
1L))
-                  .hasMessageContaining(format("The warn threshold 2 for %s 
should be lower than the fail threshold 1",
-                                               guardrail.name + 
"_warn_threshold"));
+                  .hasMessageContaining(guardrail.name + "_warn_threshold 
should be lower than the fail threshold");
     }
 
     protected void assertThresholdValid(String query) throws Throwable
@@ -138,7 +146,6 @@ public abstract class ThresholdTester extends 
GuardrailTester
     private void assertInvalidPositiveProperty(BiConsumer<Guardrails, Long> 
setter,
                                                long value,
                                                long maxValue,
-                                               boolean allowZero,
                                                String name)
     {
         try
@@ -146,6 +153,15 @@ public abstract class ThresholdTester extends 
GuardrailTester
             assertValidProperty(setter, value);
             fail(format("Expected exception for guardrails.%s value: %d", 
name, value));
         }
+        catch (ConfigurationException e)
+        {
+            String expectedMessage = null;
+
+            if (value < 0)
+                expectedMessage = "Invalid data storage: value must be 
positive";
+
+            Assertions.assertThat(e.getMessage()).contains(expectedMessage);
+        }
         catch (IllegalArgumentException e)
         {
             String expectedMessage = null;
@@ -153,14 +169,14 @@ public abstract class ThresholdTester extends 
GuardrailTester
             if (value > maxValue)
                 expectedMessage = format("Invalid value %d for %s: maximum 
allowed value is %d",
                                          value, name, maxValue);
-            if (value == 0 && !allowZero)
+            if (value == 0 && value != disabledValue)
                 expectedMessage = format("Invalid value for %s: 0 is not 
allowed; if attempting to disable use %s",
-                                         name, Config.DISABLED_GUARDRAIL);
+                                         name, disabledValue);
 
-            if (value < Config.DISABLED_GUARDRAIL)
+            if (value < 0 && value != disabledValue)
                 expectedMessage = format("Invalid value %d for %s: negative 
values are not "
                                          + "allowed, outside of %s which 
disables the guardrail",
-                                         value, name, 
Config.DISABLED_GUARDRAIL);
+                                         value, name, disabledValue);
 
             assertEquals(format("Exception message '%s' does not contain 
'%s'", e.getMessage(), expectedMessage),
                          expectedMessage, e.getMessage());
@@ -169,15 +185,15 @@ public abstract class ThresholdTester extends 
GuardrailTester
 
     private void assertInvalidStrictlyPositiveProperty(BiConsumer<Guardrails, 
Long> setter, long value, String name)
     {
-        assertInvalidPositiveProperty(setter, value, maxValue, false, name);
+        assertInvalidPositiveProperty(setter, value, maxValue, name);
     }
 
     protected void 
testValidationOfStrictlyPositiveProperty(BiConsumer<Guardrails, Long> setter, 
String name)
     {
         assertInvalidStrictlyPositiveProperty(setter, Integer.MIN_VALUE, name);
         assertInvalidStrictlyPositiveProperty(setter, -2, name);
-        assertValidProperty(setter, (long) Config.DISABLED_GUARDRAIL); // 
disabled
-        assertInvalidStrictlyPositiveProperty(setter, 0, name);
+        assertValidProperty(setter, disabledValue);
+        assertInvalidStrictlyPositiveProperty(setter, disabledValue == 0 ? -1 
: 0, name);
         assertValidProperty(setter, 1L);
         assertValidProperty(setter, 2L);
         assertValidProperty(setter, maxValue);
diff --git a/test/unit/org/apache/cassandra/schema/TableMetadataTest.java 
b/test/unit/org/apache/cassandra/schema/TableMetadataTest.java
new file mode 100644
index 0000000..357ac01
--- /dev/null
+++ b/test/unit/org/apache/cassandra/schema/TableMetadataTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+
+import static org.junit.Assert.assertEquals;
+
+public class TableMetadataTest
+{
+    @Test
+    public void testPartitionKeyAsCQLLiteral()
+    {
+        String keyspaceName = "keyspace";
+        String tableName = "table";
+
+        // composite type
+        CompositeType type1 = CompositeType.getInstance(UTF8Type.instance, 
UTF8Type.instance, UTF8Type.instance);
+        TableMetadata metadata1 = TableMetadata.builder(keyspaceName, 
tableName)
+                                               .addPartitionKeyColumn("key", 
type1)
+                                               .build();
+        assertEquals("('test:', 'composite!', 'type)')",
+                     
metadata1.partitionKeyAsCQLLiteral(type1.decompose("test:", "composite!", 
"type)")));
+
+        // composite type with tuple
+        CompositeType type2 = CompositeType.getInstance(new 
TupleType(Arrays.asList(FloatType.instance, UTF8Type.instance)),
+                                                        IntegerType.instance);
+        TableMetadata metadata2 = TableMetadata.builder(keyspaceName, 
tableName)
+                                               .addPartitionKeyColumn("key", 
type2)
+                                               .build();
+        ByteBuffer tupleValue = TupleType.buildValue(new ByteBuffer[]{ 
FloatType.instance.decompose(0.33f),
+                                                                       
UTF8Type.instance.decompose("tuple test") });
+        assertEquals("((0.33, 'tuple test'), 10)",
+                     
metadata2.partitionKeyAsCQLLiteral(type2.decompose(tupleValue, 
BigInteger.valueOf(10))));
+
+        // plain type
+        TableMetadata metadata3 = TableMetadata.builder(keyspaceName, 
tableName)
+                                               .addPartitionKeyColumn("key", 
UTF8Type.instance).build();
+        assertEquals("'non-composite test'",
+                     
metadata3.partitionKeyAsCQLLiteral(UTF8Type.instance.decompose("non-composite 
test")));
+    }
+
+    @Test
+    public void testPrimaryKeyAsCQLLiteral()
+    {
+        String keyspaceName = "keyspace";
+        String tableName = "table";
+
+        TableMetadata metadata;
+
+        // one partition key column, no clustering key
+        metadata = TableMetadata.builder(keyspaceName, tableName)
+                                .addPartitionKeyColumn("key", 
UTF8Type.instance)
+                                .build();
+        assertEquals("'Test'", 
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("Test"), 
Clustering.EMPTY));
+
+        // two partition key columns, no clustering key
+        metadata = TableMetadata.builder(keyspaceName, tableName)
+                                .addPartitionKeyColumn("k1", UTF8Type.instance)
+                                .addPartitionKeyColumn("k2", 
Int32Type.instance)
+                                .build();
+        assertEquals("('Test', -12)",
+                     
metadata.primaryKeyAsCQLLiteral(CompositeType.getInstance(UTF8Type.instance, 
Int32Type.instance)
+                                                                  
.decompose("Test", -12), Clustering.EMPTY));
+
+        // one partition key column, one clustering key column
+        metadata = TableMetadata.builder(keyspaceName, tableName)
+                                .addPartitionKeyColumn("key", 
UTF8Type.instance)
+                                .addClusteringColumn("clustering", 
UTF8Type.instance)
+                                .build();
+        assertEquals("('k', 'Cluster')",
+                     
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"),
+                                                     
Clustering.make(UTF8Type.instance.decompose("Cluster"))));
+        assertEquals("'k'",
+                     
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"), 
Clustering.EMPTY));
+        assertEquals("'k'",
+                     
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"), 
Clustering.STATIC_CLUSTERING));
+
+        // one partition key column, two clustering key columns
+        metadata = TableMetadata.builder(keyspaceName, tableName)
+                                .addPartitionKeyColumn("key", 
UTF8Type.instance)
+                                .addClusteringColumn("c1", UTF8Type.instance)
+                                .addClusteringColumn("c2", UTF8Type.instance)
+                                .build();
+        assertEquals("('k', 'c1', 'c2')",
+                     
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"),
+                                                     
Clustering.make(UTF8Type.instance.decompose("c1"),
+                                                                     
UTF8Type.instance.decompose("c2"))));
+        assertEquals("'k'",
+                     
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"), 
Clustering.EMPTY));
+        assertEquals("'k'",
+                     
metadata.primaryKeyAsCQLLiteral(UTF8Type.instance.decompose("k"), 
Clustering.STATIC_CLUSTERING));
+
+        // two partition key columns, two clustering key columns
+        CompositeType composite = 
CompositeType.getInstance(Int32Type.instance, BooleanType.instance);
+        metadata = TableMetadata.builder(keyspaceName, tableName)
+                                .addPartitionKeyColumn("k1", 
Int32Type.instance)
+                                .addPartitionKeyColumn("k2", 
BooleanType.instance)
+                                .addClusteringColumn("c1", UTF8Type.instance)
+                                .addClusteringColumn("c2", UTF8Type.instance)
+                                .build();
+        assertEquals("(0, true, 'Cluster_1', 'Cluster_2')",
+                     metadata.primaryKeyAsCQLLiteral(composite.decompose(0, 
true),
+                                                     
Clustering.make(UTF8Type.instance.decompose("Cluster_1"),
+                                                                     
UTF8Type.instance.decompose("Cluster_2"))));
+        assertEquals("(1, true)",
+                     metadata.primaryKeyAsCQLLiteral(composite.decompose(1, 
true), Clustering.EMPTY));
+        assertEquals("(2, true)",
+                     metadata.primaryKeyAsCQLLiteral(composite.decompose(2, 
true), Clustering.STATIC_CLUSTERING));
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to