This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4b7069c06c Add guardrail for column size
4b7069c06c is described below

commit 4b7069c06c7913e1f3f2d5622d8cbc04746b0bac
Author: Andrés de la Peña <[email protected]>
AuthorDate: Mon Jun 13 20:08:55 2022 +0100

    Add guardrail for column size
    
    patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova and David 
Capwell for CASSANDRA-17151
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   1 +
 conf/cassandra.yaml                                |  18 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../apache/cassandra/config/GuardrailsOptions.java |  28 +
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   2 +-
 .../apache/cassandra/cql3/UpdateParameters.java    |  15 +
 src/java/org/apache/cassandra/cql3/Validation.java |   2 +-
 .../apache/cassandra/db/guardrails/Guardrails.java |  43 +-
 .../cassandra/db/guardrails/GuardrailsConfig.java  |  12 +
 .../cassandra/db/guardrails/GuardrailsMBean.java   |  24 +
 .../guardrails/GuardrailColumnValueSizeTest.java   | 650 +++++++++++++++++++++
 12 files changed, 788 insertions(+), 10 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7dce2b90bd..2c94811e55 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Add guardrail for column size (CASSANDRA-17151)
  * When doing a host replacement, we need to check that the node is a live 
node before failing with "Cannot replace a live node..." (CASSANDRA-17805)
  * Add support to generate a One-Shot heap dump on unhandled exceptions 
(CASSANDRA-17795)
  * Rate-limit new client connection auth setup to avoid overwhelming bcrypt 
(CASSANDRA-17812)
diff --git a/NEWS.txt b/NEWS.txt
index 96ad4b9ac0..74142539f7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -73,6 +73,7 @@ New features
       - Whether SimpleStrategy is allowed on keyspace creation or alteration
       - Maximum replication factor
       - Whether DROP KEYSPACE commands are allowed.
+      - Column value size
     - It is possible to list ephemeral snapshots by nodetool listsnaphots 
command when flag "-e" is specified.
 
 Upgrading
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 21e3f78c10..55c8b5a756 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1767,6 +1767,24 @@ drop_compact_storage_enabled: false
 # write_consistency_levels_warned: []
 # write_consistency_levels_disallowed: []
 #
+# Guardrail to warn or fail when writing column values larger than threshold.
+# This guardrail is only applied to the values of regular columns because both 
the serialized partitions keys and the
+# values of the components of the clustering key already have a fixed, 
relatively small size limit of 65535 bytes, which
+# is probably lesser than the thresholds defined here.
+# Deleting individual elements of non-frozen sets and maps involves creating 
tombstones that contain the value of the
+# deleted element, independently on whether the element existed or not. That 
tombstone value is also guarded by this
+# guardrail, to prevent the insertion of tombstones over the threshold. The 
downside is that enabling or raising this
+# threshold can prevent users from deleting set/map elements that were written 
when the guardrail was disabled or with a
+# lower value. Deleting the entire column, row or partition is always allowed, 
since the tombstones created for those
+# operations don't contain the CQL column values.
+# This guardrail is different to max_value_size. max_value_size is checked 
when deserializing any value to detect
+# sstable corruption, whereas this guardrail is checked on the CQL layer at 
write time to reject regular user queries
+# inserting too large columns.
+# The two thresholds default to null to disable.
+# Min unit: B
+# column_value_size_warn_threshold:
+# column_value_size_fail_threshold:
+#
 # Guardrail to warn or fail when encountering larger size of collection data 
than threshold.
 # At query time this guardrail is applied only to the collection fragment that 
is being writen, even though in the case
 # of non-frozen collections there could be unaccounted parts of the collection 
on the sstables. This is done this way to
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index f14bb3544a..b5e76d3a01 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -847,6 +847,8 @@ public class Config
     public volatile boolean read_before_write_list_operations_enabled = true;
     public volatile boolean allow_filtering_enabled = true;
     public volatile boolean simplestrategy_enabled = true;
+    public volatile DataStorageSpec.LongBytesBound 
column_value_size_warn_threshold = null;
+    public volatile DataStorageSpec.LongBytesBound 
column_value_size_fail_threshold = null;
     public volatile DataStorageSpec.LongBytesBound 
collection_size_warn_threshold = null;
     public volatile DataStorageSpec.LongBytesBound 
collection_size_fail_threshold = null;
     public volatile int items_per_collection_warn_threshold = -1;
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java 
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index e84e0e2a9f..9c7bd55cfd 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -76,6 +76,7 @@ public class GuardrailsOptions implements GuardrailsConfig
         config.read_consistency_levels_disallowed = 
validateConsistencyLevels(config.read_consistency_levels_disallowed, 
"read_consistency_levels_disallowed");
         config.write_consistency_levels_warned = 
validateConsistencyLevels(config.write_consistency_levels_warned, 
"write_consistency_levels_warned");
         config.write_consistency_levels_disallowed = 
validateConsistencyLevels(config.write_consistency_levels_disallowed, 
"write_consistency_levels_disallowed");
+        validateSizeThreshold(config.column_value_size_warn_threshold, 
config.column_value_size_fail_threshold, false, "column_value_size");
         validateSizeThreshold(config.collection_size_warn_threshold, 
config.collection_size_fail_threshold, false, "collection_size");
         validateMaxIntThreshold(config.items_per_collection_warn_threshold, 
config.items_per_collection_fail_threshold, "items_per_collection");
         validateMaxIntThreshold(config.fields_per_udt_warn_threshold, 
config.fields_per_udt_fail_threshold, "fields_per_udt");
@@ -536,6 +537,33 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> 
config.write_consistency_levels_disallowed = x);
     }
 
+    @Override
+    @Nullable
+    public DataStorageSpec.LongBytesBound getColumnValueSizeWarnThreshold()
+    {
+        return config.column_value_size_warn_threshold;
+    }
+
+    @Override
+    @Nullable
+    public DataStorageSpec.LongBytesBound getColumnValueSizeFailThreshold()
+    {
+        return config.column_value_size_fail_threshold;
+    }
+
+    public void setColumnValueSizeThreshold(@Nullable 
DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound 
fail)
+    {
+        validateSizeThreshold(warn, fail, false, "column_value_size");
+        updatePropertyWithLogging("column_value_size_warn_threshold",
+                                  warn,
+                                  () -> 
config.column_value_size_warn_threshold,
+                                  x -> config.column_value_size_warn_threshold 
= x);
+        updatePropertyWithLogging("column_value_size_fail_threshold",
+                                  fail,
+                                  () -> 
config.column_value_size_fail_threshold,
+                                  x -> config.column_value_size_fail_threshold 
= x);
+    }
+
     @Override
     @Nullable
     public DataStorageSpec.LongBytesBound getCollectionSizeWarnThreshold()
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ae01551b69..188cb8aa0c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -237,7 +237,7 @@ public class QueryProcessor implements QueryHandler
         if (key == ByteBufferUtil.UNSET_BYTE_BUFFER)
             throw new InvalidRequestException("Key may not be unset");
 
-        // check that key can be handled by FBUtilities.writeShortByteArray
+        // check that key can be handled by ByteArrayUtil.writeWithShortLength 
and ByteBufferUtil.writeWithShortLength
         if (key.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
         {
             throw new InvalidRequestException("Key length of " + 
key.remaining() +
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java 
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 2d59366a09..b505480d98 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
@@ -143,6 +144,15 @@ public class UpdateParameters
 
     public void addTombstone(ColumnMetadata column, CellPath path) throws 
InvalidRequestException
     {
+        // Deleting individual elements of non-frozen sets and maps involves 
creating tombstones that contain the value
+        // of the deleted element, independently on whether the element 
existed or not. That tombstone value is guarded
+        // by the columnValueSize guardrail, to prevent the insertion of 
tombstones over the threshold. The downside is
+        // that enabling or raising this threshold can prevent users from 
deleting set/map elements that were written
+        // when the guardrail was disabled or with a lower value. Deleting the 
entire column, row or partition is always
+        // allowed, since the tombstones created for those operations don't 
contain the CQL column values.
+        if (path != null && column.type.isMultiCell())
+            Guardrails.columnValueSize.guard(path.dataSize(), 
column.name.toString(), false, clientState);
+
         builder.addCell(BufferCell.tombstone(column, timestamp, nowInSec, 
path));
     }
 
@@ -153,6 +163,11 @@ public class UpdateParameters
 
     public Cell<?> addCell(ColumnMetadata column, CellPath path, ByteBuffer 
value) throws InvalidRequestException
     {
+        Guardrails.columnValueSize.guard(value.remaining(), 
column.name.toString(), false, clientState);
+
+        if (path != null && column.type.isMultiCell())
+            Guardrails.columnValueSize.guard(path.dataSize(), 
column.name.toString(), false, clientState);
+
         Cell<?> cell = ttl == LivenessInfo.NO_TTL
                        ? BufferCell.live(column, timestamp, value, path)
                        : BufferCell.expiring(column, timestamp, ttl, nowInSec, 
value, path);
diff --git a/src/java/org/apache/cassandra/cql3/Validation.java 
b/src/java/org/apache/cassandra/cql3/Validation.java
index 34a4027342..27a1b4e667 100644
--- a/src/java/org/apache/cassandra/cql3/Validation.java
+++ b/src/java/org/apache/cassandra/cql3/Validation.java
@@ -47,7 +47,7 @@ public abstract class Validation
         if (key == null || key.remaining() == 0)
             throw new InvalidRequestException("Key may not be empty");
 
-        // check that key can be handled by FBUtilities.writeShortByteArray
+        // check that key can be handled by ByteArrayUtil.writeWithShortLength 
and ByteBufferUtil.writeWithShortLength
         if (key.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
         {
             throw new InvalidRequestException("Key length of " + 
key.remaining() +
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 1381655955..633f396ce5 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -272,6 +272,17 @@ public final class Guardrails implements GuardrailsMBean
                  state -> 
CONFIG_PROVIDER.getOrCreate(state).getWriteConsistencyLevelsDisallowed(),
                  "write consistency levels");
 
+    /**
+     * Guardrail on the size of a collection.
+     */
+    public static final MaxThreshold columnValueSize =
+    new MaxThreshold("column_value_size",
+                     state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getColumnValueSizeWarnThreshold()),
+                     state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getColumnValueSizeFailThreshold()),
+                     (isWarning, what, value, threshold) ->
+                     format("Value of column %s has size %s, this exceeds the 
%s threshold of %s.",
+                            what, value, isWarning ? "warning" : "failure", 
threshold));
+
     /**
      * Guardrail on the size of a collection.
      */
@@ -280,10 +291,8 @@ public final class Guardrails implements GuardrailsMBean
                      state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold()),
                      state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold()),
                      (isWarning, what, value, threshold) ->
-                     isWarning ? format("Detected collection %s of size %s, 
this exceeds the warning threshold of %s.",
-                                        what, value, threshold)
-                               : format("Detected collection %s of size %s, 
this exceeds the failure threshold of %s.",
-                                        what, value, threshold));
+                     format("Detected collection %s of size %s, this exceeds 
the %s threshold of %s.",
+                            what, value, isWarning ? "warning" : "failure", 
threshold));
 
     /**
      * Guardrail on the number of items of a collection.
@@ -293,10 +302,8 @@ public final class Guardrails implements GuardrailsMBean
                      state -> 
CONFIG_PROVIDER.getOrCreate(state).getItemsPerCollectionWarnThreshold(),
                      state -> 
CONFIG_PROVIDER.getOrCreate(state).getItemsPerCollectionFailThreshold(),
                      (isWarning, what, value, threshold) ->
-                     isWarning ? format("Detected collection %s with %s items, 
this exceeds the warning threshold of %s.",
-                                        what, value, threshold)
-                               : format("Detected collection %s with %s items, 
this exceeds the failure threshold of %s.",
-                                        what, value, threshold));
+                     format("Detected collection %s with %s items, this 
exceeds the %s threshold of %s.",
+                            what, value, isWarning ? "warning" : "failure", 
threshold));
 
     /**
      * Guardrail on the number of fields on each UDT.
@@ -719,6 +726,26 @@ public final class Guardrails implements GuardrailsMBean
         DEFAULT_CONFIG.setPartitionKeysInSelectThreshold(warn, fail);
     }
 
+    @Override
+    @Nullable
+    public String getColumnValueSizeWarnThreshold()
+    {
+        return sizeToString(DEFAULT_CONFIG.getColumnValueSizeWarnThreshold());
+    }
+
+    @Override
+    @Nullable
+    public String getColumnValueSizeFailThreshold()
+    {
+        return sizeToString(DEFAULT_CONFIG.getColumnValueSizeFailThreshold());
+    }
+
+    @Override
+    public void setColumnValueSizeThreshold(@Nullable String warnSize, 
@Nullable String failSize)
+    {
+        DEFAULT_CONFIG.setColumnValueSizeThreshold(sizeFromString(warnSize), 
sizeFromString(failSize));
+    }
+
     @Override
     @Nullable
     public String getCollectionSizeWarnThreshold()
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index d21b899241..9cf481e27b 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -237,6 +237,18 @@ public interface GuardrailsConfig
      */
     Set<ConsistencyLevel> getWriteConsistencyLevelsDisallowed();
 
+    /**
+     * @return The threshold to warn when writing column values larger than 
threshold.
+     */
+    @Nullable
+    DataStorageSpec.LongBytesBound getColumnValueSizeWarnThreshold();
+
+    /**
+     * @return The threshold to prevent writing column values larger than 
threshold.
+     */
+    @Nullable
+    DataStorageSpec.LongBytesBound getColumnValueSizeFailThreshold();
+
     /**
      * @return The threshold to warn when encountering a collection with 
larger data size than threshold.
      */
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index e410d5c16f..48a01dad6f 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -468,6 +468,30 @@ public interface GuardrailsMBean
      */
     void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels);
 
+    /**
+     * @return The threshold to warn when encountering column values larger 
than threshold, as a string  formatted as
+     * in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 
40B}. A {@code null} value means disabled.
+     */
+    @Nullable
+    String getColumnValueSizeWarnThreshold();
+
+    /**
+     * @return The threshold to prevent column values larger than threshold, 
as a string formatted as in, for example,
+     * {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. A {@code 
null} value means disabled.
+     */
+    @Nullable
+    String getColumnValueSizeFailThreshold();
+
+    /**
+     * @param warnSize The threshold to warn when encountering column values 
larger than threshold, as a string
+     *                 formatted as in, for example, {@code 10GiB}, {@code 
20MiB}, {@code 30KiB} or {@code 40B}.
+     *                 A {@code null} value means disabled.
+     * @param failSize The threshold to prevent column values larger than 
threshold, as a string formatted as in, for
+     *                 example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or 
{@code 40B}.
+     *                 A {@code null} value means disabled.
+     */
+    void setColumnValueSizeThreshold(@Nullable String warnSize, @Nullable 
String failSize);
+
     /**
      * @return The threshold to warn when encountering larger size of 
collection data than threshold, as a string
      * formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 
30KiB} or {@code 40B}.  A {@code null} value
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java
 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java
new file mode 100644
index 0000000000..eab1cf749c
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+
+import static java.lang.String.format;
+import static java.nio.ByteBuffer.allocate;
+
+/**
+ * Tests the guardrail for the size of column values, {@link 
Guardrails#columnValueSize}.
+ */
+public class GuardrailColumnValueSizeTest extends ThresholdTester
+{
+    private static final int WARN_THRESHOLD = 1024; // bytes
+    private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes
+
+    public GuardrailColumnValueSizeTest()
+    {
+        super(WARN_THRESHOLD + "B",
+              FAIL_THRESHOLD + "B",
+              Guardrails.columnValueSize,
+              Guardrails::setColumnValueSizeThreshold,
+              Guardrails::getColumnValueSizeWarnThreshold,
+              Guardrails::getColumnValueSizeFailThreshold);
+    }
+
+    @Test
+    public void testSimplePartitionKey() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text PRIMARY KEY, v int)");
+
+        // the size of primary key columns is not guarded because they already 
have a fixed limit of 65535B
+
+        testNoThreshold("INSERT INTO %s (k, v) VALUES (?, 0)");
+        testNoThreshold("UPDATE %s SET v = 1 WHERE k = ?");
+        testNoThreshold("DELETE v FROM %s WHERE k = ?");
+        testNoThreshold("DELETE FROM %s WHERE k = ?");
+    }
+
+    @Test
+    public void testCompositePartitionKey() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k1 text, k2 text, v int, PRIMARY 
KEY((k1, k2)))");
+
+        // the size of primary key columns is not guarded because they already 
have a fixed limit of 65535B
+
+        testNoThreshold2("INSERT INTO %s (k1, k2, v) VALUES (?, ?, 0)");
+        testNoThreshold2("UPDATE %s SET v = 1 WHERE k1 = ? AND k2 = ?");
+        testNoThreshold2("DELETE v FROM %s WHERE k1 = ? AND k2 = ?");
+        testNoThreshold2("DELETE FROM %s WHERE k1 = ? AND k2 = ?");
+    }
+
+    @Test
+    public void testSimpleClustering() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, c text, v int, PRIMARY KEY(k, 
c))");
+
+        // the size of primary key columns is not guarded because they already 
have a fixed limit of 65535B
+
+        testNoThreshold("INSERT INTO %s (k, c, v) VALUES (0, ?, 0)");
+        testNoThreshold("UPDATE %s SET v = 1 WHERE k = 0 AND c = ?");
+        testNoThreshold("DELETE v FROM %s WHERE k = 0 AND c = ?");
+        testNoThreshold("DELETE FROM %s WHERE k = 0 AND c = ?");
+    }
+
+    @Test
+    public void testCompositeClustering() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, c1 text, c2 text, v int, PRIMARY 
KEY(k, c1, c2))");
+
+        // the size of primary key columns is not guarded because they already 
have a fixed limit of 65535B
+
+        testNoThreshold("DELETE FROM %s WHERE k = 0 AND c1 = ?");
+        testNoThreshold("DELETE FROM %s WHERE k = 0 AND c1 > ?");
+        testNoThreshold("DELETE FROM %s WHERE k = 0 AND c1 < ?");
+        testNoThreshold("DELETE FROM %s WHERE k = 0 AND c1 >= ?");
+        testNoThreshold("DELETE FROM %s WHERE k = 0 AND c1 <= ?");
+
+        testNoThreshold2("INSERT INTO %s (k, c1, c2, v) VALUES (0, ?, ?, 0)");
+        testNoThreshold2("UPDATE %s SET v = 1 WHERE k = 0 AND c1 = ? AND c2 = 
?");
+        testNoThreshold2("DELETE v FROM %s WHERE k = 0 AND c1 = ? AND c2 = ?");
+        testNoThreshold2("DELETE FROM %s WHERE k = 0 AND c1 = ? AND c2 = ?");
+        testNoThreshold2("DELETE FROM %s WHERE k = 0 AND c1 = ? AND c2 > ?");
+        testNoThreshold2("DELETE FROM %s WHERE k = 0 AND c1 = ? AND c2 < ?");
+        testNoThreshold2("DELETE FROM %s WHERE k = 0 AND c1 = ? AND c2 >= ?");
+        testNoThreshold2("DELETE FROM %s WHERE k = 0 AND c1 = ? AND c2 <= ?");
+    }
+
+    @Test
+    public void testRegularColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v text)");
+
+        testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, ?)");
+        testThreshold("v", "UPDATE %s SET v = ? WHERE k = 0");
+    }
+
+    @Test
+    public void testStaticColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, c int, s text STATIC, r int, 
PRIMARY KEY(k, c))");
+
+        testThreshold("s", "INSERT INTO %s (k, s) VALUES (0, ?)");
+        testThreshold("s", "INSERT INTO %s (k, c, s, r) VALUES (0, 0, ?, 0)");
+        testThreshold("s", "UPDATE %s SET s = ? WHERE k = 0");
+        testThreshold("s", "UPDATE %s SET s = ?, r = 0 WHERE k = 0 AND c = 0");
+    }
+
+    @Test
+    public void testTuple() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, t tuple<text, 
text>)");
+
+        testThreshold2("t", "INSERT INTO %s (k, t) VALUES (0, (?, ?))", 8);
+        testThreshold2("t", "UPDATE %s SET t = (?, ?) WHERE k = 0", 8);
+    }
+
+    @Test
+    public void testUDT() throws Throwable
+    {
+        String udt = createType("CREATE TYPE %s (a text, b text)");
+        createTable(format("CREATE TABLE %%s (k int PRIMARY KEY, u %s)", udt));
+
+        testThreshold("u", "INSERT INTO %s (k, u) VALUES (0, {a: ?})");
+        testThreshold("u", "INSERT INTO %s (k, u) VALUES (0, {b: ?})");
+        testThreshold("u", "UPDATE %s SET u = {a: ?} WHERE k = 0");
+        testThreshold("u", "UPDATE %s SET u = {b: ?} WHERE k = 0");
+        testThreshold("u", "UPDATE %s SET u.a = ? WHERE k = 0");
+        testThreshold("u", "UPDATE %s SET u.b = ? WHERE k = 0");
+        testThreshold2("u", "INSERT INTO %s (k, u) VALUES (0, {a: ?, b: ?})");
+        testThreshold2("u", "UPDATE %s SET u.a = ?, u.b = ? WHERE k = 0");
+        testThreshold2("u", "UPDATE %s SET u = {a: ?, b: ?} WHERE k = 0");
+    }
+
+    @Test
+    public void testFrozenUDT() throws Throwable
+    {
+        String udt = createType("CREATE TYPE %s (a text, b text)");
+        createTable(format("CREATE TABLE %%s (k int PRIMARY KEY, v 
frozen<%s>)", udt));
+
+        testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, {a: ?})", 8);
+        testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, {b: ?})", 8);
+        testThreshold("v", "UPDATE %s SET v = {a: ?} WHERE k = 0", 8);
+        testThreshold("v", "UPDATE %s SET v = {b: ?} WHERE k = 0", 8);
+        testThreshold2("v", "INSERT INTO %s (k, v) VALUES (0, {a: ?, b: ?})", 
8);
+        testThreshold2("v", "UPDATE %s SET v = {a: ?, b: ?} WHERE k = 0", 8);
+    }
+
+    @Test
+    public void testNestedUDT() throws Throwable
+    {
+        String inner = createType("CREATE TYPE %s (c text, d text)");
+        String outer = createType(format("CREATE TYPE %%s (a text, b 
frozen<%s>)", inner));
+        createTable(format("CREATE TABLE %%s (k int PRIMARY KEY, v %s)", 
outer));
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, v) VALUES (0, 
{a: ?, b: {c: ?, d: ?}})",
+                                          "UPDATE %s SET v = {a: ?, b: {c: ?, 
d: ?}} WHERE k = 0"))
+        {
+            assertValid(query, allocate(0), allocate(0), allocate(0));
+            assertValid(query, allocate(WARN_THRESHOLD - 8), allocate(0), 
allocate(0));
+            assertValid(query, allocate(0), allocate(WARN_THRESHOLD - 8), 
allocate(0));
+            assertValid(query, allocate(0), allocate(0), 
allocate(WARN_THRESHOLD - 8));
+
+            assertWarns("v", query, allocate(WARN_THRESHOLD + 1), allocate(0), 
allocate(0));
+            assertWarns("v", query, allocate(0), allocate(WARN_THRESHOLD - 7), 
allocate(0));
+            assertWarns("v", query, allocate(0), allocate(0), 
allocate(WARN_THRESHOLD - 7));
+
+            assertFails("v", query, allocate(FAIL_THRESHOLD + 1), allocate(0), 
allocate(0));
+            assertFails("v", query, allocate(0), allocate(FAIL_THRESHOLD - 7), 
allocate(0));
+            assertFails("v", query, allocate(0), allocate(0), 
allocate(FAIL_THRESHOLD - 7));
+        }
+    }
+
+    @Test
+    public void testList() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<text>)");
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, l) VALUES (0, 
?)",
+                                          "UPDATE %s SET l = ? WHERE k = 0",
+                                          "UPDATE %s SET l = l + ? WHERE k = 
0"))
+        {
+            testCollection("l", query, this::list);
+        }
+
+        testThreshold("l", "UPDATE %s SET l[0] = ? WHERE k = 0");
+
+        String query = "UPDATE %s SET l = l - ? WHERE k = 0";
+        assertValid(query, this::list, allocate(1));
+        assertValid(query, this::list, allocate(FAIL_THRESHOLD));
+        assertValid(query, this::list, allocate(FAIL_THRESHOLD + 1)); // 
Doesn't write anything because we couldn't write
+    }
+
+    @Test
+    public void testFrozenList() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, fl 
frozen<list<text>>)");
+
+        // the serialized size of a frozen list is the size of its serialized 
elements, plus a 32-bit integer prefix for
+        // the number of elements, and another 32-bit integer for the size of 
each element
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, fl) VALUES (0, 
?)",
+                                          "UPDATE %s SET fl = ? WHERE k = 0"))
+        {
+            testFrozenCollection("fl", query, this::list);
+        }
+    }
+
+    @Test
+    public void testSet() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, s set<text>)");
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, s) VALUES (0, 
?)",
+                                          "UPDATE %s SET s = ? WHERE k = 0",
+                                          "UPDATE %s SET s = s + ? WHERE k = 
0",
+                                          "UPDATE %s SET s = s - ? WHERE k = 
0"))
+        {
+            testCollection("s", query, this::set);
+        }
+    }
+
+    @Test
+    public void testSetWithClustering() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, c1 int, c2 int, s set<text>, 
PRIMARY KEY(k, c1, c2))");
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, c1, c2, s) 
VALUES (0, 0, 0, ?)",
+                                          "UPDATE %s SET s = ? WHERE k = 0 AND 
c1 = 0 AND c2 = 0",
+                                          "UPDATE %s SET s = s + ? WHERE k = 0 
AND c1 = 0 AND c2 = 0",
+                                          "UPDATE %s SET s = s - ? WHERE k = 0 
AND c1 = 0 AND c2 = 0"))
+        {
+            testCollection("s", query, this::set);
+        }
+    }
+
+    @Test
+    public void testFrozenSet() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, fs 
frozen<set<text>>)");
+
+        // the serialized size of a frozen set is the size of its serialized 
elements, plus a 32-bit integer prefix for
+        // the number of elements, and another 32-bit integer for the size of 
each element
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, fs) VALUES (0, 
?)",
+                                          "UPDATE %s SET fs = ? WHERE k = 0"))
+        {
+            testFrozenCollection("fs", query, this::set);
+        }
+    }
+
+    @Test
+    public void testMap() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, m map<text, text>)");
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, m) VALUES (0, 
?)",
+                                          "UPDATE %s SET m = ? WHERE k = 0",
+                                          "UPDATE %s SET m = m + ? WHERE k = 
0"))
+        {
+            testMap("m", query);
+        }
+
+        testThreshold2("m", "UPDATE %s SET m[?] = ? WHERE k = 0");
+        testCollection("m", "UPDATE %s SET m = m - ? WHERE k = 0", this::set);
+    }
+
+    @Test
+    public void testMapWithClustering() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, c1 int, c2 int, mc map<text, 
text>, PRIMARY KEY(k, c1, c2))");
+        testMap("mc", "INSERT INTO %s (k, c1, c2, mc) VALUES (0, 0, 0, ?)");
+    }
+
+    @Test
+    public void testFrozenMap() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY,fm frozen<map<text, 
text>>)");
+
+        // the serialized size of a frozen map is the size of the serialized 
values plus a 32-bit integer prefix for the
+        // number of key-value pairs, and another 32-bit integer for the size 
of each value
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, fm) VALUES (0, 
?)",
+                                          "UPDATE %s SET fm = ? WHERE k = 0"))
+        {
+            assertValid(query, this::map, allocate(1), allocate(1));
+            assertValid(query, this::map, allocate(WARN_THRESHOLD - 13), 
allocate(1));
+            assertValid(query, this::map, allocate(1), allocate(WARN_THRESHOLD 
- 13));
+
+            assertWarns("fm", query, this::map, allocate(WARN_THRESHOLD - 12), 
allocate(1));
+            assertWarns("fm", query, this::map, allocate(1), 
allocate(WARN_THRESHOLD - 12));
+
+            assertFails("fm", query, this::map, allocate(FAIL_THRESHOLD - 12), 
allocate(1));
+            assertFails("fm", query, this::map, allocate(1), 
allocate(FAIL_THRESHOLD - 12));
+        }
+    }
+
+    @Test
+    public void testBatch() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, c text, r text, s text STATIC, 
PRIMARY KEY(k, c))");
+
+        // partition key
+        testNoThreshold("BEGIN BATCH INSERT INTO %s (k, c, r) VALUES (?, '0', 
'0'); APPLY BATCH;");
+        testNoThreshold("BEGIN BATCH UPDATE %s SET r = '0' WHERE k = ? AND c = 
'0'; APPLY BATCH;");
+        testNoThreshold("BEGIN BATCH DELETE r FROM %s WHERE k = ? AND c = '0'; 
APPLY BATCH;");
+        testNoThreshold("BEGIN BATCH DELETE FROM %s WHERE k = ?; APPLY 
BATCH;");
+
+        // static column
+        testThreshold("s", "BEGIN BATCH INSERT INTO %s (k, s) VALUES ('0', ?); 
APPLY BATCH;");
+        testThreshold("s", "BEGIN BATCH INSERT INTO %s (k, s, c, r) VALUES 
('0', ?, '0', '0'); APPLY BATCH;");
+        testThreshold("s", "BEGIN BATCH UPDATE %s SET s = ? WHERE k = '0'; 
APPLY BATCH;");
+        testThreshold("s", "BEGIN BATCH UPDATE %s SET s = ?, r = '0' WHERE k = 
'0' AND c = '0'; APPLY BATCH;");
+
+        // clustering key
+        testNoThreshold("BEGIN BATCH INSERT INTO %s (k, c, r) VALUES ('0', ?, 
'0'); APPLY BATCH;");
+        testNoThreshold("BEGIN BATCH UPDATE %s SET r = '0' WHERE k = '0' AND c 
= ?; APPLY BATCH;");
+        testNoThreshold("BEGIN BATCH DELETE r FROM %s WHERE k = '0' AND c = ?; 
APPLY BATCH;");
+        testNoThreshold("BEGIN BATCH DELETE FROM %s WHERE k = '0' AND c = ?; 
APPLY BATCH;");
+
+        // regular column
+        testThreshold("r", "BEGIN BATCH INSERT INTO %s (k, c, r) VALUES ('0', 
'0', ?); APPLY BATCH;");
+        testThreshold("r", "BEGIN BATCH UPDATE %s SET r = ? WHERE k = '0' AND 
c = '0'; APPLY BATCH;");
+    }
+
+    @Test
+    public void testCASWithIfNotExistsCondition() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, c text, v text, s text STATIC, 
PRIMARY KEY(k, c))");
+
+        // partition key
+        testNoThreshold("INSERT INTO %s (k, c, v) VALUES (?, '0', '0') IF NOT 
EXISTS");
+
+        // clustering key
+        testNoThreshold("INSERT INTO %s (k, c, v) VALUES ('0', ?, '0') IF NOT 
EXISTS");
+
+        // static column
+        assertValid("INSERT INTO %s (k, s) VALUES ('1', ?) IF NOT EXISTS", 
allocate(1));
+        assertValid("INSERT INTO %s (k, s) VALUES ('2', ?) IF NOT EXISTS", 
allocate(WARN_THRESHOLD));
+        assertValid("INSERT INTO %s (k, s) VALUES ('2', ?) IF NOT EXISTS", 
allocate(WARN_THRESHOLD + 1)); // not applied
+        assertWarns("s", "INSERT INTO %s (k, s) VALUES ('3', ?) IF NOT 
EXISTS", allocate(WARN_THRESHOLD + 1));
+
+        // regular column
+        assertValid("INSERT INTO %s (k, c, v) VALUES ('4', '0', ?) IF NOT 
EXISTS", allocate(1));
+        assertValid("INSERT INTO %s (k, c, v) VALUES ('5', '0', ?) IF NOT 
EXISTS", allocate(WARN_THRESHOLD));
+        assertValid("INSERT INTO %s (k, c, v) VALUES ('5', '0', ?) IF NOT 
EXISTS", allocate(WARN_THRESHOLD + 1)); // not applied
+        assertWarns("v", "INSERT INTO %s (k, c, v) VALUES ('6', '0', ?) IF NOT 
EXISTS", allocate(WARN_THRESHOLD + 1));
+    }
+
+    @Test
+    public void testCASWithIfExistsCondition() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, c text, v text, s text STATIC, 
PRIMARY KEY(k, c))");
+
+        // partition key, the CAS updates with values beyond the threshold are 
not applied so they don't come to fail
+        testNoThreshold("UPDATE %s SET v = '0' WHERE k = ? AND c = '0' IF 
EXISTS");
+
+        // clustering key, the CAS updates with values beyond the threshold 
are not applied so they don't come to fail
+        testNoThreshold("UPDATE %s SET v = '0' WHERE k = '0' AND c = ? IF 
EXISTS");
+
+        // static column, only the applied CAS updates can fire the guardrail
+        assertValid("INSERT INTO %s (k, s) VALUES ('0', '0')");
+        testThreshold("s", "UPDATE %s SET s = ? WHERE k = '0' IF EXISTS");
+        assertValid("DELETE FROM %s WHERE k = '0'");
+        testNoThreshold("UPDATE %s SET s = ? WHERE k = '0' IF EXISTS");
+
+        // regular column, only the applied CAS updates can fire the guardrail
+        assertValid("INSERT INTO %s (k, c) VALUES ('0', '0')");
+        testThreshold("v", "UPDATE %s SET v = ? WHERE k = '0' AND c = '0' IF 
EXISTS");
+        assertValid("DELETE FROM %s WHERE k = '0' AND c = '0'");
+        testNoThreshold("UPDATE %s SET v = ? WHERE k = '0' AND c = '0' IF 
EXISTS");
+    }
+
+    @Test
+    public void testCASWithColumnsCondition() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v text)");
+
+        // updates are always accepted for values lesser than the threshold, 
independently of whether they are applied
+        assertValid("DELETE FROM %s WHERE k = 0");
+        assertValid("UPDATE %s SET v = ? WHERE k = 0 IF v = '0'", allocate(1));
+        assertValid("UPDATE %s SET v = '0' WHERE k = 0");
+        assertValid("UPDATE %s SET v = ? WHERE k = 0 IF v = '0'", allocate(1));
+
+        // updates are always accepted for values equals to the threshold, 
independently of whether they are applied
+        assertValid("DELETE FROM %s WHERE k = 0");
+        assertValid("UPDATE %s SET v = ? WHERE k = 0 IF v = '0'", 
allocate(WARN_THRESHOLD));
+        assertValid("UPDATE %s SET v = '0' WHERE k = 0");
+        assertValid("UPDATE %s SET v = ? WHERE k = 0 IF v = '0'", 
allocate(WARN_THRESHOLD));
+
+        // updates beyond the threshold fail only if the update is applied
+        assertValid("DELETE FROM %s WHERE k = 0");
+        assertValid("UPDATE %s SET v = ? WHERE k = 0 IF v = '0'", 
allocate(WARN_THRESHOLD + 1));
+        assertValid("UPDATE %s SET v = '0' WHERE k = 0");
+        assertWarns("v", "UPDATE %s SET v = ? WHERE k = 0 IF v = '0'", 
allocate(WARN_THRESHOLD + 1));
+    }
+
+    @Test
+    public void testSelect() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, c text, r text, s text STATIC, 
PRIMARY KEY(k, c))");
+
+        // the guardail is only checked for writes; reads are excluded
+
+        testNoThreshold("SELECT * FROM %s WHERE k = ?");
+        testNoThreshold("SELECT * FROM %s WHERE k = '0' AND c = ?");
+        testNoThreshold("SELECT * FROM %s WHERE c = ? ALLOW FILTERING");
+        testNoThreshold("SELECT * FROM %s WHERE s = ? ALLOW FILTERING");
+        testNoThreshold("SELECT * FROM %s WHERE r = ? ALLOW FILTERING");
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is not applied for 
the specified 1-placeholder CQL query.
+     *
+     * @param query a CQL modification statement with exactly one placeholder
+     */
+    private void testNoThreshold(String query) throws Throwable
+    {
+        assertValid(query, allocate(1));
+
+        assertValid(query, allocate(WARN_THRESHOLD));
+        assertValid(query, allocate(WARN_THRESHOLD + 1));
+
+        assertValid(query, allocate(FAIL_THRESHOLD));
+        assertValid(query, allocate(FAIL_THRESHOLD + 1));
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is not applied for 
the specified 2-placeholder CQL query.
+     *
+     * @param query a CQL modification statement with exactly two placeholders
+     */
+    private void testNoThreshold2(String query) throws Throwable
+    {
+        assertValid(query, allocate(1), allocate(1));
+
+        assertValid(query, allocate(WARN_THRESHOLD), allocate(1));
+        assertValid(query, allocate(1), allocate(WARN_THRESHOLD));
+        assertValid(query, allocate((WARN_THRESHOLD)), 
allocate((WARN_THRESHOLD)));
+        assertValid(query, allocate(WARN_THRESHOLD + 1), allocate(1));
+        assertValid(query, allocate(1), allocate(WARN_THRESHOLD + 1));
+
+        assertValid(query, allocate(FAIL_THRESHOLD), allocate(1));
+        assertValid(query, allocate(1), allocate(FAIL_THRESHOLD));
+        assertValid(query, allocate((FAIL_THRESHOLD)), 
allocate((FAIL_THRESHOLD)));
+        assertValid(query, allocate(FAIL_THRESHOLD + 1), allocate(1));
+        assertValid(query, allocate(1), allocate(FAIL_THRESHOLD + 1));
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is applied for the 
specified 1-placeholder CQL query.
+     *
+     * @param column the name of the column referenced by the query placeholder
+     * @param query  a CQL query with exactly one placeholder
+     */
+    private void testThreshold(String column, String query) throws Throwable
+    {
+        testThreshold(column, query, 0);
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is applied for the 
specified 1-placeholder CQL query.
+     *
+     * @param column             the name of the column referenced by the 
query placeholder
+     * @param query              a CQL query with exactly one placeholder
+     * @param serializationBytes the extra bytes added to the placeholder 
value by its wrapping column type serializer
+     */
+    private void testThreshold(String column, String query, int 
serializationBytes) throws Throwable
+    {
+        int warn = WARN_THRESHOLD - serializationBytes;
+        int fail = FAIL_THRESHOLD - serializationBytes;
+
+        assertValid(query, allocate(0));
+        assertValid(query, allocate(warn));
+        assertWarns(column, query, allocate(warn + 1));
+        assertFails(column, query, allocate(fail + 1));
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is applied for the 
specified 2-placeholder CQL query.
+     *
+     * @param column the name of the column referenced by the placeholders
+     * @param query  a CQL query with exactly two placeholders
+     */
+    private void testThreshold2(String column, String query) throws Throwable
+    {
+        testThreshold2(column, query, 0);
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is applied for the 
specified 2-placeholder query.
+     *
+     * @param column             the name of the column referenced by the 
placeholders
+     * @param query              a CQL query with exactly two placeholders
+     * @param serializationBytes the extra bytes added to the size of the 
placeholder value by their wrapping serializer
+     */
+    private void testThreshold2(String column, String query, int 
serializationBytes) throws Throwable
+    {
+        int warn = WARN_THRESHOLD - serializationBytes;
+        int fail = FAIL_THRESHOLD - serializationBytes;
+
+        assertValid(query, allocate(0), allocate(0));
+        assertValid(query, allocate(warn), allocate(0));
+        assertValid(query, allocate(0), allocate(warn));
+        assertValid(query, allocate(warn / 2), allocate(warn / 2));
+
+        assertWarns(column, query, allocate(warn + 1), allocate(0));
+        assertWarns(column, query, allocate(0), allocate(warn + 1));
+
+        assertFails(column, query, allocate(fail + 1), allocate(0));
+        assertFails(column, query, allocate(0), allocate(fail + 1));
+    }
+
+    private void testCollection(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable
+    {
+        assertValid(query, collectionBuilder, allocate(1));
+        assertValid(query, collectionBuilder, allocate(1), allocate(1));
+        assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD));
+        assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD), 
allocate(1));
+        assertValid(query, collectionBuilder, allocate(1), 
allocate(WARN_THRESHOLD));
+        assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD), 
allocate(WARN_THRESHOLD));
+
+        assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD 
+ 1));
+        assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD 
+ 1), allocate(1));
+        assertWarns(column, query, collectionBuilder, allocate(1), 
allocate(WARN_THRESHOLD + 1));
+
+        assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD 
+ 1));
+        assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD 
+ 1), allocate(1));
+        assertFails(column, query, collectionBuilder, allocate(1), 
allocate(FAIL_THRESHOLD + 1));
+    }
+
+    private void testFrozenCollection(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable
+    {
+        assertValid(query, collectionBuilder, allocate(1));
+        assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD - 8));
+        assertValid(query, collectionBuilder, allocate((WARN_THRESHOLD - 12) / 
2), allocate((WARN_THRESHOLD - 12) / 2));
+
+        assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD 
- 7));
+        assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD 
- 12), allocate(1));
+
+        assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD 
- 7));
+        assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD 
- 12), allocate(1));
+    }
+
+    private void testMap(String column, String query) throws Throwable
+    {
+        assertValid(query, this::map, allocate(1), allocate(1));
+        assertValid(query, this::map, allocate(WARN_THRESHOLD), allocate(1));
+        assertValid(query, this::map, allocate(1), allocate(WARN_THRESHOLD));
+        assertValid(query, this::map, allocate(WARN_THRESHOLD), 
allocate(WARN_THRESHOLD));
+
+        assertWarns(column, query, this::map, allocate(1), 
allocate(WARN_THRESHOLD + 1));
+        assertWarns(column, query, this::map, allocate(WARN_THRESHOLD + 1), 
allocate(1));
+
+        assertFails(column, query, this::map, allocate(FAIL_THRESHOLD + 1), 
allocate(1));
+        assertFails(column, query, this::map, allocate(1), 
allocate(FAIL_THRESHOLD + 1));
+        assertFails(column, query, this::map, allocate(FAIL_THRESHOLD + 1), 
allocate(FAIL_THRESHOLD + 1));
+    }
+
+    private void assertValid(String query, ByteBuffer... values) throws 
Throwable
+    {
+        assertValid(() -> execute(query, values));
+    }
+
+    private void assertValid(String query, Function<ByteBuffer[], ByteBuffer> 
collectionBuilder, ByteBuffer... values) throws Throwable
+    {
+        assertValid(() -> execute(query, collectionBuilder.apply(values)));
+    }
+
+    private void assertWarns(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) 
throws Throwable
+    {
+        assertWarns(column, query, collectionBuilder.apply(values));
+    }
+
+    private void assertWarns(String column, String query, ByteBuffer... 
values) throws Throwable
+    {
+        String errorMessage = format("Value of column %s has size %s, this 
exceeds the warning threshold of %s.",
+                                     column, WARN_THRESHOLD + 1, 
WARN_THRESHOLD);
+        assertWarns(() -> execute(query, values), errorMessage);
+    }
+
+    private void assertFails(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) 
throws Throwable
+    {
+        assertFails(column, query, collectionBuilder.apply(values));
+    }
+
+    private void assertFails(String column, String query, ByteBuffer... 
values) throws Throwable
+    {
+        String errorMessage = format("Value of column %s has size %s, this 
exceeds the failure threshold of %s.",
+                                     column, FAIL_THRESHOLD + 1, 
FAIL_THRESHOLD);
+        assertFails(() -> execute(query, values), errorMessage);
+    }
+
+    private void execute(String query, ByteBuffer... values)
+    {
+        execute(userClientState, query, Arrays.asList(values));
+    }
+
+    private ByteBuffer set(ByteBuffer... values)
+    {
+        return SetType.getInstance(BytesType.instance, 
true).decompose(ImmutableSet.copyOf(values));
+    }
+
+    private ByteBuffer list(ByteBuffer... values)
+    {
+        return ListType.getInstance(BytesType.instance, 
true).decompose(ImmutableList.copyOf(values));
+    }
+
+    private ByteBuffer map(ByteBuffer... values)
+    {
+        assert values.length % 2 == 0;
+
+        int size = values.length / 2;
+        Map<ByteBuffer, ByteBuffer> m = new LinkedHashMap<>(size);
+        for (int i = 0; i < size; i++)
+            m.put(values[2 * i], values[(2 * i) + 1]);
+
+        return MapType.getInstance(BytesType.instance, BytesType.instance, 
true).decompose(m);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to