This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 530bc10bd0 Remove ProtocolVersion entirely from the 
CollectionSerializer ecosystem
530bc10bd0 is described below

commit 530bc10bd0a053f5dcd8439fd3f5c72cd7952ea6
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Jan 11 13:04:06 2023 -0600

    Remove ProtocolVersion entirely from the CollectionSerializer ecosystem
    
    patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-18114
---
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/cql3/CQL3Type.java   | 44 ++++++-------
 src/java/org/apache/cassandra/cql3/Constants.java  |  2 +-
 src/java/org/apache/cassandra/cql3/Lists.java      | 26 ++++----
 src/java/org/apache/cassandra/cql3/Maps.java       | 41 ++++++++----
 src/java/org/apache/cassandra/cql3/Sets.java       | 44 ++++++++-----
 src/java/org/apache/cassandra/cql3/Term.java       |  3 +-
 src/java/org/apache/cassandra/cql3/Terms.java      |  6 +-
 src/java/org/apache/cassandra/cql3/Tuples.java     | 32 +++++----
 .../apache/cassandra/cql3/UntypedResultSet.java    | 31 +++++----
 src/java/org/apache/cassandra/cql3/UserTypes.java  |  2 +-
 .../cassandra/cql3/functions/CollectionFcts.java   |  2 +-
 .../cassandra/cql3/functions/FunctionCall.java     |  8 +--
 .../cql3/restrictions/MultiColumnRestriction.java  | 28 +++++---
 .../cql3/restrictions/SingleColumnRestriction.java |  3 +-
 .../cassandra/cql3/selection/ColumnTimestamps.java |  2 +-
 .../cassandra/cql3/selection/ListSelector.java     |  2 +-
 .../cassandra/cql3/selection/MapSelector.java      |  2 +-
 .../apache/cassandra/cql3/selection/Selector.java  |  2 +-
 .../cassandra/cql3/selection/SetSelector.java      |  2 +-
 .../cassandra/db/marshal/CollectionType.java       | 48 +++++++-------
 .../org/apache/cassandra/db/marshal/ListType.java  | 10 +--
 .../org/apache/cassandra/db/marshal/MapType.java   | 73 +++++++++++----------
 .../org/apache/cassandra/db/marshal/SetType.java   | 10 +--
 .../org/apache/cassandra/db/marshal/TupleType.java |  6 +-
 .../serializers/AbstractMapSerializer.java         | 31 +++++----
 .../serializers/CollectionSerializer.java          | 76 +++++++---------------
 .../cassandra/serializers/ListSerializer.java      | 37 ++++++-----
 .../cassandra/serializers/MapSerializer.java       | 56 +++++++++-------
 .../cassandra/serializers/SetSerializer.java       | 40 +++++++-----
 .../apache/cassandra/cql3/CQL3TypeLiteralTest.java |  2 +-
 .../apache/cassandra/transport/SerDeserTest.java   | 70 ++++++++++++--------
 32 files changed, 394 insertions(+), 348 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b1bc6f0cc9..a3eb241153 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Remove ProtocolVersion entirely from the CollectionSerializer ecosystem 
(CASSANDRA-18114)
  * Fix serialization error in new getsstables --show-levels option 
(CASSANDRA-18140)
  * Use checked casts when reading vints as ints (CASSANDRA-18099)
  * Add Mutation Serialization Caching (CASSANDRA-17998)
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java 
b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index 1c20e6b0ff..6c9bd41839 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -24,13 +24,13 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.marshal.CollectionType.Kind;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
@@ -54,7 +54,6 @@ public interface CQL3Type
     }
 
     public AbstractType<?> getType();
-    default public AbstractType<?> getUDFType() { return getType(); }
 
     /**
      * Generates CQL literal from a binary value of this type.
@@ -101,11 +100,6 @@ public interface CQL3Type
             return type;
         }
 
-        public AbstractType<?> getUDFType()
-        {
-            return this == TIMEUUID ? UUID.type : type;
-        }
-
         /**
          * Delegate to
          * {@link 
org.apache.cassandra.serializers.TypeSerializer#toCQLLiteral(ByteBuffer)}
@@ -176,9 +170,9 @@ public interface CQL3Type
 
     public static class Collection implements CQL3Type
     {
-        private final CollectionType type;
+        private final CollectionType<?> type;
 
-        public Collection(CollectionType type)
+        public Collection(CollectionType<?> type)
         {
             this.type = type;
         }
@@ -201,19 +195,19 @@ public interface CQL3Type
 
             StringBuilder target = new StringBuilder();
             buffer = buffer.duplicate();
-            int size = CollectionSerializer.readCollectionSize(buffer, 
ByteBufferAccessor.instance, version);
-            buffer.position(buffer.position() + 
CollectionSerializer.sizeOfCollectionSize(size, version));
+            int size = CollectionSerializer.readCollectionSize(buffer, 
ByteBufferAccessor.instance);
+            buffer.position(buffer.position() + 
CollectionSerializer.sizeOfCollectionSize());
 
             switch (type.kind)
             {
                 case LIST:
-                    CQL3Type elements = ((ListType) 
type).getElementsType().asCQL3Type();
+                    CQL3Type elements = ((ListType<?>) 
type).getElementsType().asCQL3Type();
                     target.append('[');
                     generateSetOrListCQLLiteral(buffer, version, target, size, 
elements);
                     target.append(']');
                     break;
                 case SET:
-                    elements = ((SetType) type).getElementsType().asCQL3Type();
+                    elements = ((SetType<?>) 
type).getElementsType().asCQL3Type();
                     target.append('{');
                     generateSetOrListCQLLiteral(buffer, version, target, size, 
elements);
                     target.append('}');
@@ -229,19 +223,19 @@ public interface CQL3Type
 
         private void generateMapCQLLiteral(ByteBuffer buffer, ProtocolVersion 
version, StringBuilder target, int size)
         {
-            CQL3Type keys = ((MapType) type).getKeysType().asCQL3Type();
-            CQL3Type values = ((MapType) type).getValuesType().asCQL3Type();
+            CQL3Type keys = ((MapType<?, ?>) type).getKeysType().asCQL3Type();
+            CQL3Type values = ((MapType<?, ?>) 
type).getValuesType().asCQL3Type();
             int offset = 0;
             for (int i = 0; i < size; i++)
             {
                 if (i > 0)
                     target.append(", ");
-                ByteBuffer element = CollectionSerializer.readValue(buffer, 
ByteBufferAccessor.instance, offset, version);
-                offset += CollectionSerializer.sizeOfValue(element, 
ByteBufferAccessor.instance, version);
+                ByteBuffer element = CollectionSerializer.readValue(buffer, 
ByteBufferAccessor.instance, offset);
+                offset += CollectionSerializer.sizeOfValue(element, 
ByteBufferAccessor.instance);
                 target.append(keys.toCQLLiteral(element, version));
                 target.append(": ");
-                element = CollectionSerializer.readValue(buffer, 
ByteBufferAccessor.instance, offset, version);
-                offset += CollectionSerializer.sizeOfValue(element, 
ByteBufferAccessor.instance, version);
+                element = CollectionSerializer.readValue(buffer, 
ByteBufferAccessor.instance, offset);
+                offset += CollectionSerializer.sizeOfValue(element, 
ByteBufferAccessor.instance);
                 target.append(values.toCQLLiteral(element, version));
             }
         }
@@ -253,8 +247,8 @@ public interface CQL3Type
             {
                 if (i > 0)
                     target.append(", ");
-                ByteBuffer element = CollectionSerializer.readValue(buffer, 
ByteBufferAccessor.instance, offset, version);
-                offset += CollectionSerializer.sizeOfValue(element, 
ByteBufferAccessor.instance, version);
+                ByteBuffer element = CollectionSerializer.readValue(buffer, 
ByteBufferAccessor.instance, offset);
+                offset += CollectionSerializer.sizeOfValue(element, 
ByteBufferAccessor.instance);
                 target.append(elements.toCQLLiteral(element, version));
             }
         }
@@ -283,16 +277,16 @@ public interface CQL3Type
             switch (type.kind)
             {
                 case LIST:
-                    AbstractType<?> listType = 
((ListType)type).getElementsType();
+                    AbstractType<?> listType = ((ListType<?>) 
type).getElementsType();
                     sb.append("list<").append(listType.asCQL3Type());
                     break;
                 case SET:
-                    AbstractType<?> setType = 
((SetType)type).getElementsType();
+                    AbstractType<?> setType = ((SetType<?>) 
type).getElementsType();
                     sb.append("set<").append(setType.asCQL3Type());
                     break;
                 case MAP:
-                    AbstractType<?> keysType = ((MapType)type).getKeysType();
-                    AbstractType<?> valuesType = 
((MapType)type).getValuesType();
+                    AbstractType<?> keysType = ((MapType<?, ?>) 
type).getKeysType();
+                    AbstractType<?> valuesType = ((MapType<?, ?>) 
type).getValuesType();
                     sb.append("map<").append(keysType.asCQL3Type()).append(", 
").append(valuesType.asCQL3Type());
                     break;
                 default:
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java 
b/src/java/org/apache/cassandra/cql3/Constants.java
index 803f3985fa..f4418fdada 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -420,7 +420,7 @@ public abstract class Constants
             this.bytes = bytes;
         }
 
-        public ByteBuffer get(ProtocolVersion protocolVersion)
+        public ByteBuffer get(ProtocolVersion version)
         {
             return bytes;
         }
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java 
b/src/java/org/apache/cassandra/cql3/Lists.java
index 7b6234ed27..605a8bee8e 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -17,10 +17,6 @@
  */
 package org.apache.cassandra.cql3;
 
-import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
-import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
-import static 
org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillisAsBytes;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -48,6 +44,10 @@ import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static 
org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillisAsBytes;
+
 /**
  * Static helper methods and classes for lists.
  */
@@ -72,7 +72,7 @@ public abstract class Lists
 
     private static AbstractType<?> elementsType(AbstractType<?> type)
     {
-        return ((ListType) unwrap(type)).getElementsType();
+        return ((ListType<?>) unwrap(type)).getElementsType();
     }
 
     /**
@@ -117,7 +117,7 @@ public abstract class Lists
     public static <T> String listToString(Iterable<T> items, 
java.util.function.Function<T, String> mapper)
     {
         return StreamSupport.stream(items.spliterator(), false)
-                            .map(e -> mapper.apply(e))
+                            .map(mapper)
                             .collect(Collectors.joining(", ", "[", "]"));
     }
 
@@ -222,15 +222,15 @@ public abstract class Lists
             this.elements = elements;
         }
 
-        public static Value fromSerialized(ByteBuffer value, ListType type, 
ProtocolVersion version) throws InvalidRequestException
+        public static <T> Value fromSerialized(ByteBuffer value, ListType<T> 
type) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be 
called on a serialized object,
                 // but compose does the validation (so we're fine).
-                List<?> l = 
type.getSerializer().deserializeForNativeProtocol(value, 
ByteBufferAccessor.instance, version);
+                List<T> l = type.getSerializer().deserialize(value, 
ByteBufferAccessor.instance);
                 List<ByteBuffer> elements = new ArrayList<>(l.size());
-                for (Object element : l)
+                for (T element : l)
                     // elements can be null in lists that represent a set of 
IN values
                     elements.add(element == null ? null : 
type.getElementsType().decompose(element));
                 return new Value(elements);
@@ -241,12 +241,12 @@ public abstract class Lists
             }
         }
 
-        public ByteBuffer get(ProtocolVersion protocolVersion)
+        public ByteBuffer get(ProtocolVersion version)
         {
-            return CollectionSerializer.pack(elements, elements.size(), 
protocolVersion);
+            return CollectionSerializer.pack(elements, elements.size());
         }
 
-        public boolean equals(ListType lt, Value v)
+        public boolean equals(ListType<?> lt, Value v)
         {
             if (elements.size() != v.elements.size())
                 return false;
@@ -333,7 +333,7 @@ public abstract class Lists
                 return null;
             if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
                 return UNSET_VALUE;
-            return Value.fromSerialized(value, (ListType)receiver.type, 
options.getProtocolVersion());
+            return Value.fromSerialized(value, (ListType<?>) receiver.type);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java 
b/src/java/org/apache/cassandra/cql3/Maps.java
index 96db270c18..9040d1a8d5 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -17,25 +17,38 @@
  */
 package org.apache.cassandra.cql3;
 
-import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
-
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.db.guardrails.Guardrails;
-import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
+
 /**
  * Static helper methods and classes for maps.
  */
@@ -245,16 +258,16 @@ public abstract class Maps
             this.map = map;
         }
 
-        public static Value fromSerialized(ByteBuffer value, MapType type, 
ProtocolVersion version) throws InvalidRequestException
+        public static <K, V> Value fromSerialized(ByteBuffer value, MapType<K, 
V> type) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be 
called on a serialized object,
                 // but compose does the validation (so we're fine).
-                Map<?, ?> m = 
type.getSerializer().deserializeForNativeProtocol(value, 
ByteBufferAccessor.instance, version);
+                Map<K, V> m = type.getSerializer().deserialize(value, 
ByteBufferAccessor.instance);
                 // We depend on Maps to be properly sorted by their keys, so 
use a sorted map implementation here.
                 SortedMap<ByteBuffer, ByteBuffer> map = new 
TreeMap<>(type.getKeysType());
-                for (Map.Entry<?, ?> entry : m.entrySet())
+                for (Map.Entry<K, V> entry : m.entrySet())
                     map.put(type.getKeysType().decompose(entry.getKey()), 
type.getValuesType().decompose(entry.getValue()));
                 return new Value(map);
             }
@@ -265,7 +278,7 @@ public abstract class Maps
         }
 
         @Override
-        public ByteBuffer get(ProtocolVersion protocolVersion)
+        public ByteBuffer get(ProtocolVersion version)
         {
             List<ByteBuffer> buffers = new ArrayList<>(2 * map.size());
             for (Map.Entry<ByteBuffer, ByteBuffer> entry : map.entrySet())
@@ -273,10 +286,10 @@ public abstract class Maps
                 buffers.add(entry.getKey());
                 buffers.add(entry.getValue());
             }
-            return CollectionSerializer.pack(buffers, map.size(), 
protocolVersion);
+            return CollectionSerializer.pack(buffers, map.size());
         }
 
-        public boolean equals(MapType mt, Value v)
+        public boolean equals(MapType<?, ?> mt, Value v)
         {
             if (map.size() != v.map.size())
                 return false;
@@ -364,7 +377,7 @@ public abstract class Maps
                 return null;
             if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
                 return UNSET_VALUE;
-            return Value.fromSerialized(value, (MapType)receiver.type, 
options.getProtocolVersion());
+            return Value.fromSerialized(value, (MapType<?, ?>) receiver.type);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java 
b/src/java/org/apache/cassandra/cql3/Sets.java
index ee27e2aa6f..00d6870a92 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -20,17 +20,31 @@ package org.apache.cassandra.cql3;
 import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import org.apache.cassandra.db.guardrails.Guardrails;
-import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -55,7 +69,7 @@ public abstract class Sets
 
     private static AbstractType<?> elementsType(AbstractType<?> type)
     {
-        return ((SetType) unwrap(type)).getElementsType();
+        return ((SetType<?>) unwrap(type)).getElementsType();
     }
 
     /**
@@ -106,7 +120,7 @@ public abstract class Sets
     public static <T> String setToString(Iterable<T> items, 
java.util.function.Function<T, String> mapper)
     {
         return StreamSupport.stream(items.spliterator(), false)
-                            .map(e -> mapper.apply(e))
+                            .map(mapper)
                             .collect(Collectors.joining(", ", "{", "}"));
     }
 
@@ -223,15 +237,15 @@ public abstract class Sets
             this.elements = elements;
         }
 
-        public static Value fromSerialized(ByteBuffer value, SetType type, 
ProtocolVersion version) throws InvalidRequestException
+        public static <T> Value fromSerialized(ByteBuffer value, SetType<T> 
type) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be 
called on a serialized object,
                 // but compose does the validation (so we're fine).
-                Set<?> s = 
type.getSerializer().deserializeForNativeProtocol(value, 
ByteBufferAccessor.instance, version);
+                Set<T> s = type.getSerializer().deserialize(value, 
ByteBufferAccessor.instance);
                 SortedSet<ByteBuffer> elements = new 
TreeSet<>(type.getElementsType());
-                for (Object element : s)
+                for (T element : s)
                     
elements.add(type.getElementsType().decomposeUntyped(element));
                 return new Value(elements);
             }
@@ -241,19 +255,19 @@ public abstract class Sets
             }
         }
 
-        public ByteBuffer get(ProtocolVersion protocolVersion)
+        public ByteBuffer get(ProtocolVersion version)
         {
-            return CollectionSerializer.pack(elements, elements.size(), 
protocolVersion);
+            return CollectionSerializer.pack(elements, elements.size());
         }
 
-        public boolean equals(SetType st, Value v)
+        public boolean equals(SetType<?> st, Value v)
         {
             if (elements.size() != v.elements.size())
                 return false;
 
             Iterator<ByteBuffer> thisIter = elements.iterator();
             Iterator<ByteBuffer> thatIter = v.elements.iterator();
-            AbstractType elementsType = st.getElementsType();
+            AbstractType<?> elementsType = st.getElementsType();
             while (thisIter.hasNext())
                 if (elementsType.compare(thisIter.next(), thatIter.next()) != 
0)
                     return false;
@@ -322,7 +336,7 @@ public abstract class Sets
                 return null;
             if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
                 return UNSET_VALUE;
-            return Value.fromSerialized(value, (SetType)receiver.type, 
options.getProtocolVersion());
+            return Value.fromSerialized(value, (SetType<?>) receiver.type);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/Term.java 
b/src/java/org/apache/cassandra/cql3/Term.java
index aaea1132c2..c94b6141af 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -187,9 +187,8 @@ public interface Term
 
         /**
          * @return the serialized value of this terminal.
-         * @param protocolVersion
          */
-        public abstract ByteBuffer get(ProtocolVersion protocolVersion) throws 
InvalidRequestException;
+        public abstract ByteBuffer get(ProtocolVersion version) throws 
InvalidRequestException;
 
         public ByteBuffer bindAndGet(QueryOptions options) throws 
InvalidRequestException
         {
diff --git a/src/java/org/apache/cassandra/cql3/Terms.java 
b/src/java/org/apache/cassandra/cql3/Terms.java
index 33ce2e9616..31b3fd0bd4 100644
--- a/src/java/org/apache/cassandra/cql3/Terms.java
+++ b/src/java/org/apache/cassandra/cql3/Terms.java
@@ -145,11 +145,11 @@ public interface Terms
                     switch (((CollectionType<?>) type).kind)
                     {
                         case LIST:
-                            return e -> Lists.Value.fromSerialized(e, 
(ListType<?>) type, version);
+                            return e -> Lists.Value.fromSerialized(e, 
(ListType<?>) type);
                         case SET:
-                            return e -> Sets.Value.fromSerialized(e, 
(SetType<?>) type, version);
+                            return e -> Sets.Value.fromSerialized(e, 
(SetType<?>) type);
                         case MAP:
-                            return e -> Maps.Value.fromSerialized(e, 
(MapType<?, ?>) type, version);
+                            return e -> Maps.Value.fromSerialized(e, 
(MapType<?, ?>) type);
                     }
                     throw new AssertionError();
                 }
diff --git a/src/java/org/apache/cassandra/cql3/Tuples.java 
b/src/java/org/apache/cassandra/cql3/Tuples.java
index 34f630e48d..60f963ce4f 100644
--- a/src/java/org/apache/cassandra/cql3/Tuples.java
+++ b/src/java/org/apache/cassandra/cql3/Tuples.java
@@ -24,11 +24,12 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.TupleType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -41,8 +42,6 @@ import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidReq
  */
 public class Tuples
 {
-    private static final Logger logger = LoggerFactory.getLogger(Tuples.class);
-
     private Tuples() {}
 
     public static ColumnSpecification componentSpecOf(ColumnSpecification 
column, int component)
@@ -164,7 +163,7 @@ public class Tuples
             return new Value(type.split(ByteBufferAccessor.instance, bytes));
         }
 
-        public ByteBuffer get(ProtocolVersion protocolVersion)
+        public ByteBuffer get(ProtocolVersion version)
         {
             return TupleType.buildValue(elements);
         }
@@ -258,22 +257,21 @@ public class Tuples
             this.elements = items;
         }
 
-        public static InValue fromSerialized(ByteBuffer value, ListType type, 
QueryOptions options) throws InvalidRequestException
+        public static <T> InValue fromSerialized(ByteBuffer value, ListType<T> 
type) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be 
called on a serialized object,
                 // but the deserialization does the validation (so we're fine).
-                List<?> l = 
type.getSerializer().deserializeForNativeProtocol(value, 
ByteBufferAccessor.instance, options.getProtocolVersion());
+                List<T> l = type.getSerializer().deserialize(value, 
ByteBufferAccessor.instance);
 
                 assert type.getElementsType() instanceof TupleType;
                 TupleType tupleType = 
Tuples.getTupleType(type.getElementsType());
 
                 // type.split(bytes)
                 List<List<ByteBuffer>> elements = new ArrayList<>(l.size());
-                for (Object element : l)
-                    
elements.add(Arrays.asList(tupleType.split(ByteBufferAccessor.instance,
-                                                               
type.getElementsType().decompose(element))));
+                for (T element : l)
+                    
elements.add(Arrays.asList(tupleType.split(ByteBufferAccessor.instance, 
type.getElementsType().decompose(element))));
                 return new InValue(elements);
             }
             catch (MarshalException e)
@@ -282,7 +280,7 @@ public class Tuples
             }
         }
 
-        public ByteBuffer get(ProtocolVersion protocolVersion)
+        public ByteBuffer get(ProtocolVersion version)
         {
             throw new UnsupportedOperationException();
         }
@@ -417,7 +415,7 @@ public class Tuples
             ByteBuffer value = options.getValues().get(bindIndex);
             if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
                 throw new InvalidRequestException(String.format("Invalid unset 
value for %s", receiver.name));
-            return value == null ? null : InValue.fromSerialized(value, 
(ListType)receiver.type, options);
+            return value == null ? null : InValue.fromSerialized(value, 
(ListType<?>) receiver.type);
         }
     }
 
@@ -443,7 +441,7 @@ public class Tuples
     public static <T> String tupleToString(Iterable<T> items, 
java.util.function.Function<T, String> mapper)
     {
         return StreamSupport.stream(items.spliterator(), false)
-                            .map(e -> mapper.apply(e))
+                            .map(mapper)
                             .collect(Collectors.joining(", ", "(", ")"));
     }
 
@@ -521,12 +519,12 @@ public class Tuples
     public static boolean checkIfTupleType(AbstractType<?> tuple)
     {
         return (tuple instanceof TupleType) ||
-               (tuple instanceof ReversedType && ((ReversedType) 
tuple).baseType instanceof TupleType);
+               (tuple instanceof ReversedType && ((ReversedType<?>) 
tuple).baseType instanceof TupleType);
 
     }
 
     public static TupleType getTupleType(AbstractType<?> tuple)
     {
-        return (tuple instanceof ReversedType ? ((TupleType) ((ReversedType) 
tuple).baseType) : (TupleType)tuple);
+        return (tuple instanceof ReversedType ? ((TupleType) 
((ReversedType<?>) tuple).baseType) : (TupleType)tuple);
     }
 }
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java 
b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index f00c137d8c..db22396c65 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -20,22 +20,32 @@ package org.apache.cassandra.cql3;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import com.datastax.driver.core.CodecUtils;
 import org.apache.cassandra.cql3.functions.types.LocalDate;
-import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.TimeUUID;
@@ -110,7 +120,7 @@ public abstract class UntypedResultSet implements 
Iterable<UntypedResultSet.Row>
         {
             return new AbstractIterator<Row>()
             {
-                Iterator<List<ByteBuffer>> iter = cqlRows.rows.iterator();
+                final Iterator<List<ByteBuffer>> iter = 
cqlRows.rows.iterator();
 
                 protected Row computeNext()
                 {
@@ -152,7 +162,7 @@ public abstract class UntypedResultSet implements 
Iterable<UntypedResultSet.Row>
         {
             return new AbstractIterator<Row>()
             {
-                Iterator<Map<String, ByteBuffer>> iter = cqlRows.iterator();
+                final Iterator<Map<String, ByteBuffer>> iter = 
cqlRows.iterator();
 
                 protected Row computeNext()
                 {
@@ -331,7 +341,7 @@ public abstract class UntypedResultSet implements 
Iterable<UntypedResultSet.Row>
                 {
                     ComplexColumnData complexData = 
row.getComplexColumnData(def);
                     if (complexData != null)
-                        data.put(def.name.toString(), 
((CollectionType)def.type).serializeForNativeProtocol(complexData.iterator(), 
ProtocolVersion.V3));
+                        data.put(def.name.toString(), ((CollectionType<?>) 
def.type).serializeForNativeProtocol(complexData.iterator()));
                 }
             }
 
@@ -451,11 +461,6 @@ public abstract class UntypedResultSet implements 
Iterable<UntypedResultSet.Row>
             return raw == null ? null : MapType.getInstance(keyType, 
valueType, true).compose(raw);
         }
 
-        public Map<String, String> getTextMap(String column)
-        {
-            return getMap(column, UTF8Type.instance, UTF8Type.instance);
-        }
-
         public <T> Set<T> getFrozenSet(String column, AbstractType<T> type)
         {
             ByteBuffer raw = data.get(column);
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java 
b/src/java/org/apache/cassandra/cql3/UserTypes.java
index a63420fca3..76276a70de 100644
--- a/src/java/org/apache/cassandra/cql3/UserTypes.java
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -220,7 +220,7 @@ public abstract class UserTypes
             return new Value(type, type.split(ByteBufferAccessor.instance, 
bytes));
         }
 
-        public ByteBuffer get(ProtocolVersion protocolVersion)
+        public ByteBuffer get(ProtocolVersion version)
         {
             return TupleType.buildValue(elements);
         }
diff --git a/src/java/org/apache/cassandra/cql3/functions/CollectionFcts.java 
b/src/java/org/apache/cassandra/cql3/functions/CollectionFcts.java
index 54a95647ce..ebc96b78d3 100644
--- a/src/java/org/apache/cassandra/cql3/functions/CollectionFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/CollectionFcts.java
@@ -363,7 +363,7 @@ public class CollectionFcts
                 return null;
 
             AggregateFunction.Aggregate aggregate = 
aggregateFunction.newAggregate();
-            inputType.forEach(value, version, element -> 
aggregate.addInput(version, Collections.singletonList(element)));
+            inputType.forEach(value, element -> aggregate.addInput(version, 
Collections.singletonList(element)));
             return aggregate.compute(version);
         }
     }
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java 
b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 5dc5fc8c57..33696573b2 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -107,14 +107,14 @@ public class FunctionCall extends Term.NonTerminal
             return null;
         if (fun.returnType().isCollection())
         {
-            switch (((CollectionType) fun.returnType()).kind)
+            switch (((CollectionType<?>) fun.returnType()).kind)
             {
                 case LIST:
-                    return Lists.Value.fromSerialized(result, (ListType) 
fun.returnType(), version);
+                    return Lists.Value.fromSerialized(result, (ListType<?>) 
fun.returnType());
                 case SET:
-                    return Sets.Value.fromSerialized(result, (SetType) 
fun.returnType(), version);
+                    return Sets.Value.fromSerialized(result, (SetType<?>) 
fun.returnType());
                 case MAP:
-                    return Maps.Value.fromSerialized(result, (MapType) 
fun.returnType(), version);
+                    return Maps.Value.fromSerialized(result, (MapType<?, ?>) 
fun.returnType());
             }
         }
         else if (fun.returnType().isUDT())
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
index acbb48e08b..22c000e3b5 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
@@ -18,12 +18,22 @@
 package org.apache.cassandra.cql3.restrictions;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.serializers.ListSerializer;
-import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.cql3.*;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.Tuples;
 import org.apache.cassandra.cql3.Term.Terminal;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.statements.Bound;
@@ -31,8 +41,8 @@ import org.apache.cassandra.db.MultiCBuilder;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexRegistry;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.serializers.ListSerializer;
 
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
@@ -258,7 +268,7 @@ public abstract class MultiColumnRestriction implements 
SingleRestriction
                 for (List<ByteBuffer> splitValue : splitValues)
                     values.add(splitValue.get(0));
 
-                ByteBuffer buffer = ListSerializer.pack(values, values.size(), 
ProtocolVersion.V3);
+                ByteBuffer buffer = ListSerializer.pack(values, values.size());
                 filter.add(getFirstColumn(), Operator.IN, buffer);
             }
             else
@@ -376,7 +386,7 @@ public abstract class MultiColumnRestriction implements 
SingleRestriction
         {
             boolean reversed = getFirstColumn().isReversedType();
 
-            EnumMap<Bound, List<ByteBuffer>> componentBounds = new 
EnumMap<Bound, List<ByteBuffer>>(Bound.class);
+            EnumMap<Bound, List<ByteBuffer>> componentBounds = new 
EnumMap<>(Bound.class);
             componentBounds.put(Bound.START, componentBounds(Bound.START, 
options));
             componentBounds.put(Bound.END, componentBounds(Bound.END, 
options));
 
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index e5b2465ace..5ce78ae577 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.ListSerializer;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.Term.Terminal;
 import org.apache.cassandra.cql3.functions.Function;
@@ -221,7 +220,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
                                    QueryOptions options)
         {
             List<ByteBuffer> values = getValues(options);
-            ByteBuffer buffer = ListSerializer.pack(values, values.size(), 
ProtocolVersion.V3);
+            ByteBuffer buffer = ListSerializer.pack(values, values.size());
             filter.add(columnDef, Operator.IN, buffer);
         }
 
diff --git a/src/java/org/apache/cassandra/cql3/selection/ColumnTimestamps.java 
b/src/java/org/apache/cassandra/cql3/selection/ColumnTimestamps.java
index 6a08076d5b..2cac9251b8 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ColumnTimestamps.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ColumnTimestamps.java
@@ -382,7 +382,7 @@ abstract class ColumnTimestamps
             List<ByteBuffer> buffers = new ArrayList<>(timestamps.size());
             timestamps.forEach(timestamp -> 
buffers.add(type.toByteBuffer(timestamp)));
 
-            return CollectionSerializer.pack(buffers, timestamps.size(), 
protocolVersion);
+            return CollectionSerializer.pack(buffers, timestamps.size());
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/cql3/selection/ListSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/ListSelector.java
index 4f586712de..5d095d51c5 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ListSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ListSelector.java
@@ -102,7 +102,7 @@ final class ListSelector extends Selector
         {
             buffers.add(elements.get(i).getOutput(protocolVersion));
         }
-        return CollectionSerializer.pack(buffers, buffers.size(), 
protocolVersion);
+        return CollectionSerializer.pack(buffers, buffers.size());
     }
 
     public void reset()
diff --git a/src/java/org/apache/cassandra/cql3/selection/MapSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/MapSelector.java
index 4f870a2887..26580e7263 100644
--- a/src/java/org/apache/cassandra/cql3/selection/MapSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/MapSelector.java
@@ -218,7 +218,7 @@ final class MapSelector extends Selector
             buffers.add(entry.getKey());
             buffers.add(entry.getValue());
         }
-        return CollectionSerializer.pack(buffers, elements.size(), 
protocolVersion);
+        return CollectionSerializer.pack(buffers, elements.size());
     }
 
     public void reset()
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java 
b/src/java/org/apache/cassandra/cql3/selection/Selector.java
index 2d52e569cf..121ecad4be 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@ -392,7 +392,7 @@ public abstract class Selector
             AbstractType<?> type = columns.get(index).type;
             if (type.isCollection())
             {
-                values[index] = ((CollectionType<?>) 
type).serializeForNativeProtocol(ccd.iterator(), protocolVersion);
+                values[index] = ((CollectionType<?>) 
type).serializeForNativeProtocol(ccd.iterator());
 
                 for (Cell<?> cell : ccd)
                 {
diff --git a/src/java/org/apache/cassandra/cql3/selection/SetSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/SetSelector.java
index 496b0264d3..c6aa225314 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SetSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SetSelector.java
@@ -104,7 +104,7 @@ final class SetSelector extends Selector
         {
             buffers.add(elements.get(i).getOutput(protocolVersion));
         }
-        return CollectionSerializer.pack(buffers, buffers.size(), 
protocolVersion);
+        return CollectionSerializer.pack(buffers, buffers.size());
     }
 
     public void reset()
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java 
b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index a8cc9d9231..e4346d5fab 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -152,12 +152,12 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         return values.size();
     }
 
-    public ByteBuffer serializeForNativeProtocol(Iterator<Cell<?>> cells, 
ProtocolVersion version)
+    public ByteBuffer serializeForNativeProtocol(Iterator<Cell<?>> cells)
     {
         assert isMultiCell();
         List<ByteBuffer> values = serializedValues(cells);
         int size = collectionSize(values);
-        return CollectionSerializer.pack(values, ByteBufferAccessor.instance, 
size, version);
+        return CollectionSerializer.pack(values, ByteBufferAccessor.instance, 
size);
     }
 
     @Override
@@ -169,7 +169,7 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         if (!getClass().equals(previous.getClass()))
             return false;
 
-        CollectionType tprev = (CollectionType) previous;
+        CollectionType<?> tprev = (CollectionType<?>) previous;
         if (this.isMultiCell() != tprev.isMultiCell())
             return false;
 
@@ -197,7 +197,7 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         if (!getClass().equals(previous.getClass()))
             return false;
 
-        CollectionType tprev = (CollectionType) previous;
+        CollectionType<?> tprev = (CollectionType<?>) previous;
         if (this.isMultiCell() != tprev.isMultiCell())
             return false;
 
@@ -234,7 +234,7 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         if (!(o instanceof CollectionType))
             return false;
 
-        CollectionType other = (CollectionType)o;
+        CollectionType<?> other = (CollectionType<?>) o;
 
         if (kind != other.kind)
             return false;
@@ -257,17 +257,17 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         if (accessorL.isEmpty(left) || accessorR.isEmpty(right))
             return Boolean.compare(accessorR.isEmpty(right), 
accessorL.isEmpty(left));
 
-        int sizeL = CollectionSerializer.readCollectionSize(left, accessorL, 
ProtocolVersion.V3);
-        int offsetL = CollectionSerializer.sizeOfCollectionSize(sizeL, 
ProtocolVersion.V3);
-        int sizeR = CollectionSerializer.readCollectionSize(right, accessorR, 
ProtocolVersion.V3);
+        int sizeL = CollectionSerializer.readCollectionSize(left, accessorL);
+        int offsetL = CollectionSerializer.sizeOfCollectionSize();
+        int sizeR = CollectionSerializer.readCollectionSize(right, accessorR);
         int offsetR = TypeSizes.INT_SIZE;
 
         for (int i = 0; i < Math.min(sizeL, sizeR); i++)
         {
-            VL v1 = CollectionSerializer.readValue(left, accessorL, offsetL, 
ProtocolVersion.V3);
-            offsetL += CollectionSerializer.sizeOfValue(v1, accessorL, 
ProtocolVersion.V3);
-            VR v2 = CollectionSerializer.readValue(right, accessorR, offsetR, 
ProtocolVersion.V3);
-            offsetR += CollectionSerializer.sizeOfValue(v2, accessorR, 
ProtocolVersion.V3);
+            VL v1 = CollectionSerializer.readValue(left, accessorL, offsetL);
+            offsetL += CollectionSerializer.sizeOfValue(v1, accessorL);
+            VR v2 = CollectionSerializer.readValue(right, accessorR, offsetR);
+            offsetR += CollectionSerializer.sizeOfValue(v2, accessorR);
             int cmp = elementsComparator.compare(v1, accessorL, v2, accessorR);
             if (cmp != 0)
                 return cmp;
@@ -285,13 +285,13 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
             return null;
 
         int offset = 0;
-        int size = CollectionSerializer.readCollectionSize(data, accessor, 
ProtocolVersion.V3);
-        offset += CollectionSerializer.sizeOfCollectionSize(size, 
ProtocolVersion.V3);
+        int size = CollectionSerializer.readCollectionSize(data, accessor);
+        offset += CollectionSerializer.sizeOfCollectionSize();
         ByteSource[] srcs = new ByteSource[size];
         for (int i = 0; i < size; ++i)
         {
-            V v = CollectionSerializer.readValue(data, accessor, offset, 
ProtocolVersion.V3);
-            offset += CollectionSerializer.sizeOfValue(v, accessor, 
ProtocolVersion.V3);
+            V v = CollectionSerializer.readValue(data, accessor, offset);
+            offset += CollectionSerializer.sizeOfValue(v, accessor);
             srcs[i] = elementsComparator.asComparableBytes(accessor, v, 
version);
         }
         return ByteSource.withTerminatorMaybeLegacy(version, 0x00, srcs);
@@ -316,21 +316,21 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
                 buffers.add(null);
             separator = comparableBytes.next();
         }
-        return CollectionSerializer.pack(buffers, accessor, buffers.size(), 
ProtocolVersion.V3);
+        return CollectionSerializer.pack(buffers, accessor, buffers.size());
     }
 
-    public static String setOrListToJsonString(ByteBuffer buffer, AbstractType 
elementsType, ProtocolVersion protocolVersion)
+    public static String setOrListToJsonString(ByteBuffer buffer, 
AbstractType<?> elementsType, ProtocolVersion protocolVersion)
     {
         ByteBuffer value = buffer.duplicate();
         StringBuilder sb = new StringBuilder("[");
-        int size = CollectionSerializer.readCollectionSize(value, 
ByteBufferAccessor.instance, protocolVersion);
-        int offset = CollectionSerializer.sizeOfCollectionSize(size, 
protocolVersion);
+        int size = CollectionSerializer.readCollectionSize(value, 
ByteBufferAccessor.instance);
+        int offset = CollectionSerializer.sizeOfCollectionSize();
         for (int i = 0; i < size; i++)
         {
             if (i > 0)
                 sb.append(", ");
-            ByteBuffer element = CollectionSerializer.readValue(value, 
ByteBufferAccessor.instance, offset, protocolVersion);
-            offset += CollectionSerializer.sizeOfValue(element, 
ByteBufferAccessor.instance, protocolVersion);
+            ByteBuffer element = CollectionSerializer.readValue(value, 
ByteBufferAccessor.instance, offset);
+            offset += CollectionSerializer.sizeOfValue(element, 
ByteBufferAccessor.instance);
             sb.append(elementsType.toJSONString(element, protocolVersion));
         }
         return sb.append("]").toString();
@@ -361,8 +361,8 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
 
     public int size(ByteBuffer buffer)
     {
-        return CollectionSerializer.readCollectionSize(buffer.duplicate(), 
ByteBufferAccessor.instance, ProtocolVersion.V3);
+        return CollectionSerializer.readCollectionSize(buffer.duplicate(), 
ByteBufferAccessor.instance);
     }
 
-    public abstract void forEach(ByteBuffer input, ProtocolVersion version, 
Consumer<ByteBuffer> action);
+    public abstract void forEach(ByteBuffer input, Consumer<ByteBuffer> 
action);
 }
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java 
b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 73290d5b6c..56b3ca8454 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -160,14 +160,14 @@ public class ListType<T> extends CollectionType<List<T>>
     public boolean isCompatibleWithFrozen(CollectionType<?> previous)
     {
         assert !isMultiCell;
-        return this.elements.isCompatibleWith(((ListType) previous).elements);
+        return this.elements.isCompatibleWith(((ListType<?>) 
previous).elements);
     }
 
     @Override
     public boolean isValueCompatibleWithFrozen(CollectionType<?> previous)
     {
         assert !isMultiCell;
-        return this.elements.isValueCompatibleWithInternal(((ListType) 
previous).elements);
+        return this.elements.isValueCompatibleWithInternal(((ListType<?>) 
previous).elements);
     }
 
     public <VL, VR> int compareCustom(VL left, ValueAccessor<VL> accessorL, VR 
right, ValueAccessor<VR> accessorR)
@@ -221,7 +221,7 @@ public class ListType<T> extends CollectionType<List<T>>
             throw new MarshalException(String.format(
                     "Expected a list, but got a %s: %s", 
parsed.getClass().getSimpleName(), parsed));
 
-        List list = (List) parsed;
+        List<?> list = (List<?>) parsed;
         List<Term> terms = new ArrayList<>(list.size());
         for (Object element : list)
         {
@@ -246,8 +246,8 @@ public class ListType<T> extends CollectionType<List<T>>
     }
 
     @Override
-    public void forEach(ByteBuffer input, ProtocolVersion version, 
Consumer<ByteBuffer> action)
+    public void forEach(ByteBuffer input, Consumer<ByteBuffer> action)
     {
-        serializer.forEach(input, version, action);
+        serializer.forEach(input, action);
     }
 }
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java 
b/src/java/org/apache/cassandra/db/marshal/MapType.java
index d9bfc8040c..835e9597cd 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -18,7 +18,12 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
@@ -171,7 +176,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     public boolean isCompatibleWithFrozen(CollectionType<?> previous)
     {
         assert !isMultiCell;
-        MapType tprev = (MapType) previous;
+        MapType<?, ?> tprev = (MapType<?, ?>) previous;
         return keys.isCompatibleWith(tprev.keys) && 
values.isCompatibleWith(tprev.values);
     }
 
@@ -179,7 +184,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     public boolean isValueCompatibleWithFrozen(CollectionType<?> previous)
     {
         assert !isMultiCell;
-        MapType tprev = (MapType) previous;
+        MapType<?, ?> tprev = (MapType<?, ?>) previous;
         return keys.isCompatibleWith(tprev.keys) && 
values.isValueCompatibleWith(tprev.values);
     }
 
@@ -194,27 +199,26 @@ public class MapType<K, V> extends CollectionType<Map<K, 
V>>
             return Boolean.compare(accessorR.isEmpty(right), 
accessorL.isEmpty(left));
 
 
-        ProtocolVersion protocolVersion = ProtocolVersion.V3;
-        int sizeL = CollectionSerializer.readCollectionSize(left, accessorL, 
protocolVersion);
-        int sizeR = CollectionSerializer.readCollectionSize(right, accessorR, 
protocolVersion);
+        int sizeL = CollectionSerializer.readCollectionSize(left, accessorL);
+        int sizeR = CollectionSerializer.readCollectionSize(right, accessorR);
 
-        int offsetL = CollectionSerializer.sizeOfCollectionSize(sizeL, 
protocolVersion);
-        int offsetR = CollectionSerializer.sizeOfCollectionSize(sizeR, 
protocolVersion);
+        int offsetL = CollectionSerializer.sizeOfCollectionSize();
+        int offsetR = CollectionSerializer.sizeOfCollectionSize();
 
         for (int i = 0; i < Math.min(sizeL, sizeR); i++)
         {
-            TL k1 = CollectionSerializer.readValue(left, accessorL, offsetL, 
protocolVersion);
-            offsetL += CollectionSerializer.sizeOfValue(k1, accessorL, 
protocolVersion);
-            TR k2 = CollectionSerializer.readValue(right, accessorR, offsetR, 
protocolVersion);
-            offsetR += CollectionSerializer.sizeOfValue(k2, accessorR, 
protocolVersion);
+            TL k1 = CollectionSerializer.readValue(left, accessorL, offsetL);
+            offsetL += CollectionSerializer.sizeOfValue(k1, accessorL);
+            TR k2 = CollectionSerializer.readValue(right, accessorR, offsetR);
+            offsetR += CollectionSerializer.sizeOfValue(k2, accessorR);
             int cmp = keysComparator.compare(k1, accessorL, k2, accessorR);
             if (cmp != 0)
                 return cmp;
 
-            TL v1 = CollectionSerializer.readValue(left, accessorL, offsetL, 
protocolVersion);
-            offsetL += CollectionSerializer.sizeOfValue(v1, accessorL, 
protocolVersion);
-            TR v2 = CollectionSerializer.readValue(right, accessorR, offsetR, 
protocolVersion);
-            offsetR += CollectionSerializer.sizeOfValue(v2, accessorR, 
protocolVersion);
+            TL v1 = CollectionSerializer.readValue(left, accessorL, offsetL);
+            offsetL += CollectionSerializer.sizeOfValue(v1, accessorL);
+            TR v2 = CollectionSerializer.readValue(right, accessorR, offsetR);
+            offsetR += CollectionSerializer.sizeOfValue(v2, accessorR);
             cmp = valuesComparator.compare(v1, accessorL, v2, accessorR);
             if (cmp != 0)
                 return cmp;
@@ -224,13 +228,13 @@ public class MapType<K, V> extends CollectionType<Map<K, 
V>>
     }
 
     @Override
-    public <V> ByteSource asComparableBytes(ValueAccessor<V> accessor, V data, 
Version version)
+    public <T> ByteSource asComparableBytes(ValueAccessor<T> accessor, T data, 
Version version)
     {
         return asComparableBytesMap(getKeysType(), getValuesType(), accessor, 
data, version);
     }
 
     @Override
-    public <V> V fromComparableBytes(ValueAccessor<V> accessor, 
ByteSource.Peekable comparableBytes, Version version)
+    public <T> T fromComparableBytes(ValueAccessor<T> accessor, 
ByteSource.Peekable comparableBytes, Version version)
     {
         return fromComparableBytesMap(accessor, comparableBytes, version, 
getKeysType(), getValuesType());
     }
@@ -244,18 +248,17 @@ public class MapType<K, V> extends CollectionType<Map<K, 
V>>
         if (accessor.isEmpty(data))
             return null;
 
-        ProtocolVersion protocolVersion = ProtocolVersion.V3;
         int offset = 0;
-        int size = CollectionSerializer.readCollectionSize(data, accessor, 
protocolVersion);
-        offset += CollectionSerializer.sizeOfCollectionSize(size, 
protocolVersion);
+        int size = CollectionSerializer.readCollectionSize(data, accessor);
+        offset += CollectionSerializer.sizeOfCollectionSize();
         ByteSource[] srcs = new ByteSource[size * 2];
         for (int i = 0; i < size; ++i)
         {
-            V k = CollectionSerializer.readValue(data, accessor, offset, 
protocolVersion);
-            offset += CollectionSerializer.sizeOfValue(k, accessor, 
protocolVersion);
+            V k = CollectionSerializer.readValue(data, accessor, offset);
+            offset += CollectionSerializer.sizeOfValue(k, accessor);
             srcs[i * 2 + 0] = keysComparator.asComparableBytes(accessor, k, 
version);
-            V v = CollectionSerializer.readValue(data, accessor, offset, 
protocolVersion);
-            offset += CollectionSerializer.sizeOfValue(v, accessor, 
protocolVersion);
+            V v = CollectionSerializer.readValue(data, accessor, offset);
+            offset += CollectionSerializer.sizeOfValue(v, accessor);
             srcs[i * 2 + 1] = valuesComparator.asComparableBytes(accessor, v, 
version);
         }
         return ByteSource.withTerminatorMaybeLegacy(version, 0x00, srcs);
@@ -284,7 +287,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
                         : valuesComparator.fromComparableBytes(accessor, 
comparableBytes, version));
             separator = comparableBytes.next();
         }
-        return CollectionSerializer.pack(buffers, accessor,buffers.size() / 2, 
ProtocolVersion.V3);
+        return CollectionSerializer.pack(buffers, accessor,buffers.size() / 2);
     }
 
     @Override
@@ -335,9 +338,9 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
             throw new MarshalException(String.format(
                     "Expected a map, but got a %s: %s", 
parsed.getClass().getSimpleName(), parsed));
 
-        Map<Object, Object> map = (Map<Object, Object>) parsed;
+        Map<?, ?> map = (Map<?, ?>) parsed;
         Map<Term, Term> terms = new HashMap<>(map.size());
-        for (Map.Entry<Object, Object> entry : map.entrySet())
+        for (Map.Entry<?, ?> entry : map.entrySet())
         {
             if (entry.getKey() == null)
                 throw new MarshalException("Invalid null key in map");
@@ -355,16 +358,16 @@ public class MapType<K, V> extends CollectionType<Map<K, 
V>>
     {
         ByteBuffer value = buffer.duplicate();
         StringBuilder sb = new StringBuilder("{");
-        int size = CollectionSerializer.readCollectionSize(value, 
ByteBufferAccessor.instance, protocolVersion);
-        int offset = CollectionSerializer.sizeOfCollectionSize(size, 
protocolVersion);
+        int size = CollectionSerializer.readCollectionSize(value, 
ByteBufferAccessor.instance);
+        int offset = CollectionSerializer.sizeOfCollectionSize();
         for (int i = 0; i < size; i++)
         {
             if (i > 0)
                 sb.append(", ");
 
             // map keys must be JSON strings, so convert non-string keys to 
strings
-            ByteBuffer kv = CollectionSerializer.readValue(value, 
ByteBufferAccessor.instance, offset, protocolVersion);
-            offset += CollectionSerializer.sizeOfValue(kv, 
ByteBufferAccessor.instance, protocolVersion);
+            ByteBuffer kv = CollectionSerializer.readValue(value, 
ByteBufferAccessor.instance, offset);
+            offset += CollectionSerializer.sizeOfValue(kv, 
ByteBufferAccessor.instance);
             String key = keys.toJSONString(kv, protocolVersion);
             if (key.startsWith("\""))
                 sb.append(key);
@@ -372,15 +375,15 @@ public class MapType<K, V> extends CollectionType<Map<K, 
V>>
                 sb.append('"').append(Json.quoteAsJsonString(key)).append('"');
 
             sb.append(": ");
-            ByteBuffer vv = CollectionSerializer.readValue(value, 
ByteBufferAccessor.instance, offset, protocolVersion);
-            offset += CollectionSerializer.sizeOfValue(vv, 
ByteBufferAccessor.instance, protocolVersion);
+            ByteBuffer vv = CollectionSerializer.readValue(value, 
ByteBufferAccessor.instance, offset);
+            offset += CollectionSerializer.sizeOfValue(vv, 
ByteBufferAccessor.instance);
             sb.append(values.toJSONString(vv, protocolVersion));
         }
         return sb.append("}").toString();
     }
 
     @Override
-    public void forEach(ByteBuffer input, ProtocolVersion version, 
Consumer<ByteBuffer> action)
+    public void forEach(ByteBuffer input, Consumer<ByteBuffer> action)
     {
         throw new UnsupportedOperationException();
     }
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java 
b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 8e33ca6698..e8d6413c7b 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -145,7 +145,7 @@ public class SetType<T> extends CollectionType<Set<T>>
     public boolean isCompatibleWithFrozen(CollectionType<?> previous)
     {
         assert !isMultiCell;
-        return this.elements.isCompatibleWith(((SetType) previous).elements);
+        return this.elements.isCompatibleWith(((SetType<?>) 
previous).elements);
     }
 
     @Override
@@ -194,7 +194,7 @@ public class SetType<T> extends CollectionType<Set<T>>
 
     public List<ByteBuffer> serializedValues(Iterator<Cell<?>> cells)
     {
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>();
+        List<ByteBuffer> bbs = new ArrayList<>();
         while (cells.hasNext())
             bbs.add(cells.next().path().get(0));
         return bbs;
@@ -210,7 +210,7 @@ public class SetType<T> extends CollectionType<Set<T>>
             throw new MarshalException(String.format(
                     "Expected a list (representing a set), but got a %s: %s", 
parsed.getClass().getSimpleName(), parsed));
 
-        List list = (List) parsed;
+        List<?> list = (List<?>) parsed;
         Set<Term> terms = new HashSet<>(list.size());
         for (Object element : list)
         {
@@ -229,8 +229,8 @@ public class SetType<T> extends CollectionType<Set<T>>
     }
 
     @Override
-    public void forEach(ByteBuffer input, ProtocolVersion version, 
Consumer<ByteBuffer> action)
+    public void forEach(ByteBuffer input, Consumer<ByteBuffer> action)
     {
-        serializer.forEach(input, version, action);
+        serializer.forEach(input, action);
     }
 }
diff --git a/src/java/org/apache/cassandra/db/marshal/TupleType.java 
b/src/java/org/apache/cassandra/db/marshal/TupleType.java
index c203770bc7..f16fcdecab 100644
--- a/src/java/org/apache/cassandra/db/marshal/TupleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TupleType.java
@@ -429,7 +429,7 @@ public class TupleType extends AbstractType<ByteBuffer>
             throw new MarshalException(String.format(
                     "Expected a list representation of a tuple, but got a %s: 
%s", parsed.getClass().getSimpleName(), parsed));
 
-        List list = (List) parsed;
+        List<?> list = (List<?>) parsed;
 
         if (list.size() > types.size())
             throw new MarshalException(String.format("Tuple contains extra 
items (expected %s): %s", types.size(), parsed));
@@ -465,8 +465,8 @@ public class TupleType extends AbstractType<ByteBuffer>
             if (i > 0)
                 sb.append(", ");
 
-            ByteBuffer value = CollectionSerializer.readValue(duplicated, 
ByteBufferAccessor.instance, offset, protocolVersion);
-            offset += CollectionSerializer.sizeOfValue(value, 
ByteBufferAccessor.instance, protocolVersion);
+            ByteBuffer value = CollectionSerializer.readValue(duplicated, 
ByteBufferAccessor.instance, offset);
+            offset += CollectionSerializer.sizeOfValue(value, 
ByteBufferAccessor.instance);
             if (value == null)
                 sb.append("null");
             else
diff --git 
a/src/java/org/apache/cassandra/serializers/AbstractMapSerializer.java 
b/src/java/org/apache/cassandra/serializers/AbstractMapSerializer.java
index 52255a1fde..27bae6e845 100644
--- a/src/java/org/apache/cassandra/serializers/AbstractMapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/AbstractMapSerializer.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Range;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -55,8 +54,8 @@ abstract class AbstractMapSerializer<T> extends 
CollectionSerializer<T>
         try
         {
             ByteBuffer input = collection.duplicate();
-            int n = readCollectionSize(input, ByteBufferAccessor.instance, 
ProtocolVersion.V3);
-            input.position(input.position() + sizeOfCollectionSize(n, 
ProtocolVersion.V3));
+            int n = readCollectionSize(input, ByteBufferAccessor.instance);
+            input.position(input.position() + sizeOfCollectionSize());
             int startPos = input.position();
             int count = 0;
             boolean inSlice = from == ByteBufferUtil.UNSET_BYTE_BUFFER;
@@ -64,8 +63,8 @@ abstract class AbstractMapSerializer<T> extends 
CollectionSerializer<T>
             for (int i = 0; i < n; i++)
             {
                 int pos = input.position();
-                ByteBuffer key = readValue(input, ByteBufferAccessor.instance, 
0, ProtocolVersion.V3); // key
-                input.position(input.position() + sizeOfValue(key, 
ByteBufferAccessor.instance, ProtocolVersion.V3));
+                ByteBuffer key = readValue(input, ByteBufferAccessor.instance, 
0);
+                input.position(input.position() + sizeOfValue(key, 
ByteBufferAccessor.instance));
 
                 // If we haven't passed the start already, check if we have now
                 if (!inSlice)
@@ -106,7 +105,7 @@ abstract class AbstractMapSerializer<T> extends 
CollectionSerializer<T>
             if (count == 0 && !frozen)
                 return null;
 
-            return copyAsNewCollection(collection, count, startPos, 
input.position(), ProtocolVersion.V3);
+            return copyAsNewCollection(collection, count, startPos, 
input.position());
         }
         catch (BufferUnderflowException | IndexOutOfBoundsException e)
         {
@@ -120,12 +119,12 @@ abstract class AbstractMapSerializer<T> extends 
CollectionSerializer<T>
         try
         {
             ByteBuffer input = collection.duplicate();
-            int n = readCollectionSize(input, ByteBufferAccessor.instance, 
ProtocolVersion.V3);
-            int offset = sizeOfCollectionSize(n, ProtocolVersion.V3);
+            int n = readCollectionSize(input, ByteBufferAccessor.instance);
+            int offset = sizeOfCollectionSize();
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer kbb = readValue(input, ByteBufferAccessor.instance, 
offset, ProtocolVersion.V3);
-                offset += sizeOfValue(kbb, ByteBufferAccessor.instance, 
ProtocolVersion.V3);
+                ByteBuffer kbb = readValue(input, ByteBufferAccessor.instance, 
offset);
+                offset += sizeOfValue(kbb, ByteBufferAccessor.instance);
                 int comparison = comparator.compareForCQL(kbb, key);
 
                 if (comparison == 0)
@@ -137,7 +136,7 @@ abstract class AbstractMapSerializer<T> extends 
CollectionSerializer<T>
 
                 // comparison < 0
                 if (hasValues)
-                    offset += skipValue(input, ByteBufferAccessor.instance, 
offset, ProtocolVersion.V3);
+                    offset += skipValue(input, ByteBufferAccessor.instance, 
offset);
             }
             return -1;
         }
@@ -159,8 +158,8 @@ abstract class AbstractMapSerializer<T> extends 
CollectionSerializer<T>
         try
         {
             ByteBuffer input = collection.duplicate();
-            int n = readCollectionSize(input, ByteBufferAccessor.instance, 
ProtocolVersion.V3);
-            input.position(input.position() + sizeOfCollectionSize(n, 
ProtocolVersion.V3));
+            int n = readCollectionSize(input, ByteBufferAccessor.instance);
+            input.position(input.position() + sizeOfCollectionSize());
             int start = from == ByteBufferUtil.UNSET_BYTE_BUFFER ? 0 : -1;
             int end = to == ByteBufferUtil.UNSET_BYTE_BUFFER ? n : -1;
 
@@ -171,8 +170,8 @@ abstract class AbstractMapSerializer<T> extends 
CollectionSerializer<T>
                 else if (i > 0)
                     skipMapValue(input);
 
-                ByteBuffer key = readValue(input, ByteBufferAccessor.instance, 
0, ProtocolVersion.V3);
-                input.position(input.position() + sizeOfValue(key, 
ByteBufferAccessor.instance, ProtocolVersion.V3));
+                ByteBuffer key = readValue(input, ByteBufferAccessor.instance, 
0);
+                input.position(input.position() + sizeOfValue(key, 
ByteBufferAccessor.instance));
 
                 if (start < 0)
                 {
@@ -208,6 +207,6 @@ abstract class AbstractMapSerializer<T> extends 
CollectionSerializer<T>
     private void skipMapValue(ByteBuffer input)
     {
         if (hasValues)
-            skipValue(input, ProtocolVersion.V3);
+            skipValue(input);
     }
 }
diff --git 
a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java 
b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 841a1b0aac..f77fe1bebc 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Range;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -38,74 +37,49 @@ public abstract class CollectionSerializer<T> extends 
TypeSerializer<T>
     protected abstract List<ByteBuffer> serializeValues(T value);
     protected abstract int getElementCount(T value);
 
-    public abstract <V> T deserializeForNativeProtocol(V value, 
ValueAccessor<V> accessor, ProtocolVersion version);
-
-    public T deserializeForNativeProtocol(ByteBuffer value, ProtocolVersion 
version)
-    {
-        return deserializeForNativeProtocol(value, 
ByteBufferAccessor.instance, version);
-    }
-
-    public abstract <V> void validateForNativeProtocol(V value, 
ValueAccessor<V> accessor, ProtocolVersion version);
-
+    @Override
     public ByteBuffer serialize(T input)
     {
         List<ByteBuffer> values = serializeValues(input);
-        // See deserialize() for why using the protocol v3 variant is the 
right thing to do.
-        return pack(values, ByteBufferAccessor.instance, 
getElementCount(input), ProtocolVersion.V3);
-    }
-
-    public <V> T deserialize(V value, ValueAccessor<V> accessor)
-    {
-        // The only cases we serialize/deserialize collections internally 
(i.e. not for the protocol sake),
-        // is:
-        //  1) when collections are frozen
-        //  2) for internal calls.
-        // In both case, using the protocol 3 version variant is the right 
thing to do.
-        return deserializeForNativeProtocol(value, accessor, 
ProtocolVersion.V3);
-    }
-
-    public <T1> void validate(T1 value, ValueAccessor<T1> accessor) throws 
MarshalException
-    {
-        // Same thing as above
-        validateForNativeProtocol(value, accessor, ProtocolVersion.V3);
+        return pack(values, ByteBufferAccessor.instance, 
getElementCount(input));
     }
 
-    public static ByteBuffer pack(Collection<ByteBuffer> values, int elements, 
ProtocolVersion version)
+    public static ByteBuffer pack(Collection<ByteBuffer> values, int elements)
     {
-        return pack(values, ByteBufferAccessor.instance, elements, version);
+        return pack(values, ByteBufferAccessor.instance, elements);
     }
 
-    public static <V> V pack(Collection<V> values, ValueAccessor<V> accessor, 
int elements, ProtocolVersion version)
+    public static <V> V pack(Collection<V> values, ValueAccessor<V> accessor, 
int elements)
     {
         int size = 0;
         for (V value : values)
-            size += sizeOfValue(value, accessor, version);
+            size += sizeOfValue(value, accessor);
 
-        ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements, 
version) + size);
-        writeCollectionSize(result, elements, version);
+        ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize() + size);
+        writeCollectionSize(result, elements);
         for (V value : values)
         {
-            writeValue(result, value, accessor, version);
+            writeValue(result, value, accessor);
         }
         return accessor.valueOf((ByteBuffer) result.flip());
     }
 
-    protected static void writeCollectionSize(ByteBuffer output, int elements, 
ProtocolVersion version)
+    protected static void writeCollectionSize(ByteBuffer output, int elements)
     {
         output.putInt(elements);
     }
 
-    public static <V> int readCollectionSize(V value, ValueAccessor<V> 
accessor, ProtocolVersion version)
+    public static <V> int readCollectionSize(V value, ValueAccessor<V> 
accessor)
     {
         return accessor.toInt(value);
     }
 
-    public static int sizeOfCollectionSize(int elements, ProtocolVersion 
version)
+    public static int sizeOfCollectionSize()
     {
         return TypeSizes.INT_SIZE;
     }
 
-    public static <V> void writeValue(ByteBuffer output, V value, 
ValueAccessor<V> accessor, ProtocolVersion version)
+    public static <V> void writeValue(ByteBuffer output, V value, 
ValueAccessor<V> accessor)
     {
         if (value == null)
         {
@@ -117,7 +91,7 @@ public abstract class CollectionSerializer<T> extends 
TypeSerializer<T>
         accessor.write(value, output);
     }
 
-    public static <V> V readValue(V input, ValueAccessor<V> accessor, int 
offset, ProtocolVersion version)
+    public static <V> V readValue(V input, ValueAccessor<V> accessor, int 
offset)
     {
         int size = accessor.getInt(input, offset);
         if (size < 0)
@@ -126,19 +100,19 @@ public abstract class CollectionSerializer<T> extends 
TypeSerializer<T>
         return accessor.slice(input, offset + TypeSizes.INT_SIZE, size);
     }
 
-    protected static void skipValue(ByteBuffer input, ProtocolVersion version)
+    protected static void skipValue(ByteBuffer input)
     {
         int size = input.getInt();
         input.position(input.position() + size);
     }
 
-    public static <V> int skipValue(V input, ValueAccessor<V> accessor, int 
offset, ProtocolVersion version)
+    public static <V> int skipValue(V input, ValueAccessor<V> accessor, int 
offset)
     {
         int size = accessor.getInt(input, offset);
         return TypeSizes.sizeof(size) + size;
     }
 
-    public static <V> int sizeOfValue(V value, ValueAccessor<V> accessor, 
ProtocolVersion version)
+    public static <V> int sizeOfValue(V value, ValueAccessor<V> accessor)
     {
         return value == null ? 4 : 4 + accessor.size(value);
     }
@@ -213,31 +187,31 @@ public abstract class CollectionSerializer<T> extends 
TypeSerializer<T>
      * Creates a new serialized map composed from the data from {@code input} 
between {@code startPos}
      * (inclusive) and {@code endPos} (exclusive), assuming that data holds 
{@code count} elements.
      */
-    protected ByteBuffer copyAsNewCollection(ByteBuffer input, int count, int 
startPos, int endPos, ProtocolVersion version)
+    protected ByteBuffer copyAsNewCollection(ByteBuffer input, int count, int 
startPos, int endPos)
     {
-        int sizeLen = sizeOfCollectionSize(count, version);
+        int sizeLen = sizeOfCollectionSize();
         if (count == 0)
             return ByteBuffer.allocate(sizeLen);
 
         int bodyLen = endPos - startPos;
         ByteBuffer output = ByteBuffer.allocate(sizeLen + bodyLen);
-        writeCollectionSize(output, count, version);
+        writeCollectionSize(output, count);
         output.position(0);
         ByteBufferUtil.copyBytes(input, startPos, output, sizeLen, bodyLen);
         return output;
     }
 
-    public void forEach(ByteBuffer input, ProtocolVersion version, 
Consumer<ByteBuffer> action)
+    public void forEach(ByteBuffer input, Consumer<ByteBuffer> action)
     {
         try
         {
-            int collectionSize = readCollectionSize(input, 
ByteBufferAccessor.instance, version);
-            int offset = sizeOfCollectionSize(collectionSize, version);
+            int collectionSize = readCollectionSize(input, 
ByteBufferAccessor.instance);
+            int offset = sizeOfCollectionSize();
 
             for (int i = 0; i < collectionSize; i++)
             {
-                ByteBuffer value = readValue(input, 
ByteBufferAccessor.instance, offset, version);
-                offset += sizeOfValue(value, ByteBufferAccessor.instance, 
version);
+                ByteBuffer value = readValue(input, 
ByteBufferAccessor.instance, offset);
+                offset += sizeOfValue(value, ByteBufferAccessor.instance);
 
                 action.accept(value);
             }
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java 
b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index ef5a037abc..dc65ba5e96 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
-import org.apache.cassandra.transport.ProtocolVersion;
 
 public class ListSerializer<T> extends CollectionSerializer<List<T>>
 {
@@ -55,6 +54,7 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
         this.elements = elements;
     }
 
+    @Override
     protected List<ByteBuffer> serializeValues(List<T> values)
     {
         List<ByteBuffer> output = new ArrayList<>(values.size());
@@ -63,21 +63,23 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
         return output;
     }
 
+    @Override
     public int getElementCount(List<T> value)
     {
         return value.size();
     }
 
-    public <V> void validateForNativeProtocol(V input, ValueAccessor<V> 
accessor, ProtocolVersion version)
+    @Override
+    public <V> void validate(V input, ValueAccessor<V> accessor)
     {
         try
         {
-            int n = readCollectionSize(input, accessor, version);
-            int offset = sizeOfCollectionSize(n, version);
+            int n = readCollectionSize(input, accessor);
+            int offset = sizeOfCollectionSize();
             for (int i = 0; i < n; i++)
             {
-                V value = readValue(input, accessor, offset, version);
-                offset += sizeOfValue(value, accessor, version);
+                V value = readValue(input, accessor, offset);
+                offset += sizeOfValue(value, accessor);
                 elements.validate(value, accessor);
             }
 
@@ -90,12 +92,13 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
         }
     }
 
-    public <V> List<T> deserializeForNativeProtocol(V input, ValueAccessor<V> 
accessor, ProtocolVersion version)
+    @Override
+    public <V> List<T> deserialize(V input, ValueAccessor<V> accessor)
     {
         try
         {
-            int n = readCollectionSize(input, accessor, version);
-            int offset = sizeOfCollectionSize(n, version);
+            int n = readCollectionSize(input, accessor);
+            int offset = sizeOfCollectionSize();
 
             if (n < 0)
                 throw new MarshalException("The data cannot be deserialized as 
a list");
@@ -108,8 +111,8 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
             for (int i = 0; i < n; i++)
             {
                 // We can have nulls in lists that are used for IN values
-                V databb = readValue(input, accessor, offset, version);
-                offset += sizeOfValue(databb, accessor, version);
+                V databb = readValue(input, accessor, offset);
+                offset += sizeOfValue(databb, accessor);
                 if (databb != null)
                 {
                     elements.validate(databb, accessor);
@@ -141,8 +144,8 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
     {
         try
         {
-            int s = readCollectionSize(input, accessor, ProtocolVersion.V3);
-            int offset = sizeOfCollectionSize(s, ProtocolVersion.V3);
+            int s = readCollectionSize(input, accessor);
+            int offset = sizeOfCollectionSize();
 
             for (int i = 0; i < s; i++)
             {
@@ -177,8 +180,8 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
     {
         try
         {
-            int n = readCollectionSize(input, accessor, ProtocolVersion.V3);
-            int offset = sizeOfCollectionSize(n, ProtocolVersion.V3);
+            int n = readCollectionSize(input, accessor);
+            int offset = sizeOfCollectionSize();
             if (n <= index)
                 return null;
 
@@ -187,7 +190,7 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
                 int length = accessor.getInt(input, offset);
                 offset += TypeSizes.INT_SIZE + length;
             }
-            return readValue(input, accessor, offset, ProtocolVersion.V3);
+            return readValue(input, accessor, offset);
         }
         catch (BufferUnderflowException | IndexOutOfBoundsException e)
         {
@@ -200,6 +203,7 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
         return getElement(input, ByteBufferAccessor.instance, index);
     }
 
+    @Override
     public String toString(List<T> value)
     {
         StringBuilder sb = new StringBuilder();
@@ -217,6 +221,7 @@ public class ListSerializer<T> extends 
CollectionSerializer<List<T>>
         return sb.toString();
     }
 
+    @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public Class<List<T>> getType()
     {
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java 
b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index 0417d403e6..413679e528 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -20,7 +20,10 @@ package org.apache.cassandra.serializers;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
@@ -29,7 +32,6 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.db.marshal.ValueComparators;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.Pair;
 
 public class MapSerializer<K, V> extends AbstractMapSerializer<Map<K, V>>
@@ -48,7 +50,7 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
         Pair<TypeSerializer<?>, TypeSerializer<?>> p = Pair.create(keys, 
values);
         MapSerializer<K, V> t = instances.get(p);
         if (t == null)
-            t = instances.computeIfAbsent(p, k -> new MapSerializer<>(k.left, 
k.right, comparators) );
+            t = instances.computeIfAbsent(p, k -> new MapSerializer<>(k.left, 
k.right, comparators));
         return t;
     }
 
@@ -60,6 +62,7 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
         this.comparators = comparators;
     }
 
+    @Override
     public List<ByteBuffer> serializeValues(Map<K, V> map)
     {
         List<Pair<ByteBuffer, ByteBuffer>> pairs = new ArrayList<>(map.size());
@@ -75,28 +78,30 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
         return buffers;
     }
 
+    @Override
     public int getElementCount(Map<K, V> value)
     {
         return value.size();
     }
 
-    public <T> void validateForNativeProtocol(T input, ValueAccessor<T> 
accessor, ProtocolVersion version)
+    @Override
+    public <T> void validate(T input, ValueAccessor<T> accessor)
     {
         try
         {
             // Empty values are still valid.
             if (accessor.isEmpty(input)) return;
 
-            int n = readCollectionSize(input, accessor, version);
-            int offset = sizeOfCollectionSize(n, version);
+            int n = readCollectionSize(input, accessor);
+            int offset = sizeOfCollectionSize();
             for (int i = 0; i < n; i++)
             {
-                T key = readValue(input, accessor, offset, version);
-                offset += sizeOfValue(key, accessor, version);
+                T key = readValue(input, accessor, offset);
+                offset += sizeOfValue(key, accessor);
                 keys.validate(key, accessor);
 
-                T value = readValue(input, accessor, offset, version);
-                offset += sizeOfValue(value, accessor, version);
+                T value = readValue(input, accessor, offset);
+                offset += sizeOfValue(value, accessor);
                 values.validate(value, accessor);
             }
             if (!accessor.isEmptyFromOffset(input, offset))
@@ -108,12 +113,13 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
         }
     }
 
-    public <I> Map<K, V> deserializeForNativeProtocol(I input, 
ValueAccessor<I> accessor, ProtocolVersion version)
+    @Override
+    public <I> Map<K, V> deserialize(I input, ValueAccessor<I> accessor)
     {
         try
         {
-            int n = readCollectionSize(input, accessor, version);
-            int offset = sizeOfCollectionSize(n, version);
+            int n = readCollectionSize(input, accessor);
+            int offset = sizeOfCollectionSize();
 
             if (n < 0)
                 throw new MarshalException("The data cannot be deserialized as 
a map");
@@ -125,12 +131,12 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
             Map<K, V> m = new LinkedHashMap<>(Math.min(n, 256));
             for (int i = 0; i < n; i++)
             {
-                I key = readValue(input, accessor, offset, version);
-                offset += sizeOfValue(key, accessor, version);
+                I key = readValue(input, accessor, offset);
+                offset += sizeOfValue(key, accessor);
                 keys.validate(key, accessor);
 
-                I value = readValue(input, accessor, offset, version);
-                offset += sizeOfValue(value, accessor, version);
+                I value = readValue(input, accessor, offset);
+                offset += sizeOfValue(value, accessor);
                 values.validate(value, accessor);
 
                 m.put(keys.deserialize(key, accessor), 
values.deserialize(value, accessor));
@@ -151,20 +157,20 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
         try
         {
             ByteBuffer input = collection.duplicate();
-            int n = readCollectionSize(input, ByteBufferAccessor.instance, 
ProtocolVersion.V3);
-            int offset = sizeOfCollectionSize(n, ProtocolVersion.V3);
+            int n = readCollectionSize(input, ByteBufferAccessor.instance);
+            int offset = sizeOfCollectionSize();
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer kbb = readValue(input, ByteBufferAccessor.instance, 
offset, ProtocolVersion.V3);
-                offset += sizeOfValue(kbb, ByteBufferAccessor.instance, 
ProtocolVersion.V3);
+                ByteBuffer kbb = readValue(input, ByteBufferAccessor.instance, 
offset);
+                offset += sizeOfValue(kbb, ByteBufferAccessor.instance);
                 int comparison = comparator.compareForCQL(kbb, key);
                 if (comparison == 0)
-                    return readValue(input, ByteBufferAccessor.instance, 
offset, ProtocolVersion.V3);
+                    return readValue(input, ByteBufferAccessor.instance, 
offset);
                 else if (comparison > 0)
                     // since the map is in sorted order, we know we've gone 
too far and the element doesn't exist
                     return null;
                 else // comparison < 0
-                    offset += skipValue(input, ByteBufferAccessor.instance, 
offset, ProtocolVersion.V3);
+                    offset += skipValue(input, ByteBufferAccessor.instance, 
offset);
             }
             return null;
         }
@@ -174,6 +180,7 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
         }
     }
 
+    @Override
     public String toString(Map<K, V> value)
     {
         StringBuilder sb = new StringBuilder();
@@ -193,6 +200,7 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
         return sb.toString();
     }
 
+    @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public Class<Map<K, V>> getType()
     {
@@ -200,7 +208,7 @@ public class MapSerializer<K, V> extends 
AbstractMapSerializer<Map<K, V>>
     }
 
     @Override
-    public void forEach(ByteBuffer input, ProtocolVersion version, 
Consumer<ByteBuffer> action)
+    public void forEach(ByteBuffer input, Consumer<ByteBuffer> action)
     {
         throw new UnsupportedOperationException();
     }
diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java 
b/src/java/org/apache/cassandra/serializers/SetSerializer.java
index bbf7911aaf..c5ba0de5cc 100644
--- a/src/java/org/apache/cassandra/serializers/SetSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java
@@ -20,7 +20,10 @@ package org.apache.cassandra.serializers;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -28,7 +31,6 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.db.marshal.ValueComparators;
-import org.apache.cassandra.transport.ProtocolVersion;
 
 public class SetSerializer<T> extends AbstractMapSerializer<Set<T>>
 {
@@ -55,6 +57,7 @@ public class SetSerializer<T> extends 
AbstractMapSerializer<Set<T>>
         this.comparators = comparators;
     }
 
+    @Override
     public List<ByteBuffer> serializeValues(Set<T> values)
     {
         List<ByteBuffer> buffers = new ArrayList<>(values.size());
@@ -64,24 +67,26 @@ public class SetSerializer<T> extends 
AbstractMapSerializer<Set<T>>
         return buffers;
     }
 
+    @Override
     public int getElementCount(Set<T> value)
     {
         return value.size();
     }
 
-    public <V> void validateForNativeProtocol(V input, ValueAccessor<V> 
accessor, ProtocolVersion version)
+    @Override
+    public <V> void validate(V input, ValueAccessor<V> accessor)
     {
         try
         {
             // Empty values are still valid.
             if (accessor.isEmpty(input)) return;
             
-            int n = readCollectionSize(input, accessor, version);
-            int offset = sizeOfCollectionSize(n, version);
+            int n = readCollectionSize(input, accessor);
+            int offset = sizeOfCollectionSize();
             for (int i = 0; i < n; i++)
             {
-                V value = readValue(input, accessor, offset, version);
-                offset += sizeOfValue(value, accessor, version);
+                V value = readValue(input, accessor, offset);
+                offset += sizeOfValue(value, accessor);
                 elements.validate(value, accessor);
             }
             if (!accessor.isEmptyFromOffset(input, offset))
@@ -93,12 +98,13 @@ public class SetSerializer<T> extends 
AbstractMapSerializer<Set<T>>
         }
     }
 
-    public <V> Set<T> deserializeForNativeProtocol(V input, ValueAccessor<V> 
accessor, ProtocolVersion version)
+    @Override
+    public <V> Set<T> deserialize(V input, ValueAccessor<V> accessor)
     {
         try
         {
-            int n = readCollectionSize(input, accessor, version);
-            int offset = sizeOfCollectionSize(n, version);
+            int n = readCollectionSize(input, accessor);
+            int offset = sizeOfCollectionSize();
 
             if (n < 0)
                 throw new MarshalException("The data cannot be deserialized as 
a set");
@@ -111,8 +117,8 @@ public class SetSerializer<T> extends 
AbstractMapSerializer<Set<T>>
 
             for (int i = 0; i < n; i++)
             {
-                V value = readValue(input, accessor, offset, version);
-                offset += sizeOfValue(value, accessor, version);
+                V value = readValue(input, accessor, offset);
+                offset += sizeOfValue(value, accessor);
                 elements.validate(value, accessor);
                 l.add(elements.deserialize(value, accessor));
             }
@@ -126,6 +132,7 @@ public class SetSerializer<T> extends 
AbstractMapSerializer<Set<T>>
         }
     }
 
+    @Override
     public String toString(Set<T> value)
     {
         StringBuilder sb = new StringBuilder();
@@ -147,6 +154,7 @@ public class SetSerializer<T> extends 
AbstractMapSerializer<Set<T>>
         return sb.toString();
     }
 
+    @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public Class<Set<T>> getType()
     {
@@ -158,13 +166,13 @@ public class SetSerializer<T> extends 
AbstractMapSerializer<Set<T>>
     {
         try
         {
-            int n = readCollectionSize(input, ByteBufferAccessor.instance, 
ProtocolVersion.V3);
-            int offset = sizeOfCollectionSize(n, ProtocolVersion.V3);
+            int n = readCollectionSize(input, ByteBufferAccessor.instance);
+            int offset = sizeOfCollectionSize();
 
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer value = readValue(input, 
ByteBufferAccessor.instance, offset, ProtocolVersion.V3);
-                offset += sizeOfValue(value, ByteBufferAccessor.instance, 
ProtocolVersion.V3);
+                ByteBuffer value = readValue(input, 
ByteBufferAccessor.instance, offset);
+                offset += sizeOfValue(value, ByteBufferAccessor.instance);
                 int comparison = comparator.compareForCQL(value, key);
                 if (comparison == 0)
                     return value;
diff --git a/test/unit/org/apache/cassandra/cql3/CQL3TypeLiteralTest.java 
b/test/unit/org/apache/cassandra/cql3/CQL3TypeLiteralTest.java
index f0d2c199d8..0d73731d60 100644
--- a/test/unit/org/apache/cassandra/cql3/CQL3TypeLiteralTest.java
+++ b/test/unit/org/apache/cassandra/cql3/CQL3TypeLiteralTest.java
@@ -484,7 +484,7 @@ public class CQL3TypeLiteralTest
                 }
             }
             expected.append(bracketClose);
-            buffer = CollectionSerializer.pack(buffers, added.size(), version);
+            buffer = CollectionSerializer.pack(buffers, added.size());
         }
 
         return new Value(expected.toString(), collectionType.asCQL3Type(), 
buffer);
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java 
b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index 75523e1587..6d7c3e8111 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -18,21 +18,46 @@
 package org.apache.cassandra.transport;
 
 import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.commons.lang3.RandomStringUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.ByteBuf;
 
+import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.UserTypes;
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -43,16 +68,15 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 /**
  * Serialization/deserialization tests for protocol objects and messages.
  */
 public class SerDeserTest
 {
-
     @BeforeClass
     public static void setupDD()
     {
@@ -62,12 +86,6 @@ public class SerDeserTest
 
     @Test
     public void collectionSerDeserTest() throws Exception
-    {
-        for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
-            collectionSerDeserTest(version);
-    }
-
-    public void collectionSerDeserTest(ProtocolVersion version) throws 
Exception
     {
         // Lists
         ListType<?> lt = ListType.getInstance(Int32Type.instance, true);
@@ -77,18 +95,17 @@ public class SerDeserTest
         for (Integer i : l)
             lb.add(Int32Type.instance.decompose(i));
 
-        assertEquals(l, 
lt.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(lb, 
lb.size(), version), version));
+        assertEquals(l, 
lt.getSerializer().deserialize(CollectionSerializer.pack(lb, lb.size())));
 
         // Sets
         SetType<?> st = SetType.getInstance(UTF8Type.instance, true);
-        Set<String> s = new LinkedHashSet<>();
-        s.addAll(Arrays.asList("bar", "foo", "zee"));
+        Set<String> s = new LinkedHashSet<>(Arrays.asList("bar", "foo", 
"zee"));
 
         List<ByteBuffer> sb = new ArrayList<>(s.size());
         for (String t : s)
             sb.add(UTF8Type.instance.decompose(t));
 
-        assertEquals(s, 
st.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(sb, 
sb.size(), version), version));
+        assertEquals(s, 
st.getSerializer().deserialize(CollectionSerializer.pack(sb, sb.size())));
 
         // Maps
         MapType<?, ?> mt = MapType.getInstance(UTF8Type.instance, 
LongType.instance, true);
@@ -104,17 +121,17 @@ public class SerDeserTest
             mb.add(LongType.instance.decompose(entry.getValue()));
         }
 
-        assertEquals(m, 
mt.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(mb, 
m.size(), version), version));
+        assertEquals(m, 
mt.getSerializer().deserialize(CollectionSerializer.pack(mb, m.size())));
     }
 
     @Test
-    public void eventSerDeserTest() throws Exception
+    public void eventSerDeserTest()
     {
         for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
             eventSerDeserTest(version);
     }
 
-    public void eventSerDeserTest(ProtocolVersion version) throws Exception
+    public void eventSerDeserTest(ProtocolVersion version)
     {
         List<Event> events = new ArrayList<>();
 
@@ -192,14 +209,14 @@ public class SerDeserTest
     }
 
     @Test
-    public void udtSerDeserTest() throws Exception
+    public void udtSerDeserTest()
     {
         for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
             udtSerDeserTest(version);
     }
 
 
-    public void udtSerDeserTest(ProtocolVersion version) throws Exception
+    public void udtSerDeserTest(ProtocolVersion version)
     {
         ListType<?> lt = ListType.getInstance(Int32Type.instance, true);
         SetType<?> st = SetType.getInstance(UTF8Type.instance, true);
@@ -248,16 +265,15 @@ public class SerDeserTest
         // a UDT should alway be serialized with version 3 of the protocol. 
Which is why we don't use 'version'
         // on purpose below.
 
-        assertEquals(Arrays.asList(3, 1), 
lt.getSerializer().deserializeForNativeProtocol(fields[1], ProtocolVersion.V3));
+        assertEquals(Arrays.asList(3, 1), 
lt.getSerializer().deserialize(fields[1]));
 
-        LinkedHashSet<String> s = new LinkedHashSet<>();
-        s.addAll(Arrays.asList("bar", "foo"));
-        assertEquals(s, 
st.getSerializer().deserializeForNativeProtocol(fields[2], ProtocolVersion.V3));
+        LinkedHashSet<String> s = new LinkedHashSet<>(Arrays.asList("bar", 
"foo"));
+        assertEquals(s, st.getSerializer().deserialize(fields[2]));
 
         LinkedHashMap<String, Long> m = new LinkedHashMap<>();
         m.put("bar", 12L);
         m.put("foo", 24L);
-        assertEquals(m, 
mt.getSerializer().deserializeForNativeProtocol(fields[3], ProtocolVersion.V3));
+        assertEquals(m, mt.getSerializer().deserialize(fields[3]));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to