This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bac0879  CASSANDRA-19793 Split out CassandraTypes into separate module 
(#68)
bac0879 is described below

commit bac08796181979afef4cc518789a380edef500f0
Author: jberragan <[email protected]>
AuthorDate: Fri Jul 26 10:25:13 2024 -0700

    CASSANDRA-19793 Split out CassandraTypes into separate module (#68)
    
    
    Patch by James Berragan; Reviewed by Yifan Cai, Francisco Guerrero for 
CASSANDRA-19793
---
 CHANGES.txt                                        |   1 +
 cassandra-analytics-core/build.gradle              |   6 +
 .../cassandra/bridge/CassandraBridgeFactory.java   |  20 +-
 .../cassandra/spark/KryoSerializationTests.java    |   2 +-
 .../java/org/apache/cassandra/spark/TestUtils.java |   5 +-
 .../apache/cassandra/spark/data/CqlFieldTests.java |   4 +-
 .../apache/cassandra/spark/reader/SchemaTests.java |   5 +-
 .../apache/cassandra/bridge/CassandraBridge.java   | 256 +++++++++----------
 .../cassandra/spark/data/CassandraTypes.java       | 227 +++++++++++++++++
 .../org/apache/cassandra/spark/data/CqlField.java  |  32 ++-
 .../org/apache/cassandra/spark/data/CqlTable.java  |  13 +-
 cassandra-four-zero-bridge/build.gradle            |   4 +
 .../bridge/CassandraBridgeImplementation.java      | 258 +------------------
 .../cassandra/spark/reader/SchemaBuilder.java      |   5 +-
 .../build.gradle                                   |  18 +-
 .../bridge/CassandraTypesImplementation.java       | 277 +++++++++++++++++++++
 .../cql3/functions/types/TupleHelper.java          |   0
 .../cql3/functions/types/UserTypeHelper.java       |   0
 .../org/apache/cassandra/spark/data/CqlType.java   |   0
 .../apache/cassandra/spark/data/NativeType.java    |   0
 .../spark/data/complex/CqlCollection.java          |   6 +-
 .../cassandra/spark/data/complex/CqlFrozen.java    |   0
 .../cassandra/spark/data/complex/CqlList.java      |   0
 .../cassandra/spark/data/complex/CqlMap.java       |   0
 .../cassandra/spark/data/complex/CqlSet.java       |   0
 .../cassandra/spark/data/complex/CqlTuple.java     |   0
 .../cassandra/spark/data/complex/CqlUdt.java       |  30 +--
 .../apache/cassandra/spark/data/types/Ascii.java   |   0
 .../apache/cassandra/spark/data/types/BigInt.java  |   0
 .../cassandra/spark/data/types/BinaryBased.java    |   8 +-
 .../apache/cassandra/spark/data/types/Blob.java    |   0
 .../apache/cassandra/spark/data/types/Boolean.java |   6 +-
 .../apache/cassandra/spark/data/types/Counter.java |   0
 .../apache/cassandra/spark/data/types/Date.java    |   3 +-
 .../apache/cassandra/spark/data/types/Decimal.java |   0
 .../apache/cassandra/spark/data/types/Double.java  |   6 +-
 .../cassandra/spark/data/types/Duration.java       |   0
 .../apache/cassandra/spark/data/types/Empty.java   |   6 +-
 .../apache/cassandra/spark/data/types/Float.java   |   6 +-
 .../apache/cassandra/spark/data/types/Inet.java    |   0
 .../org/apache/cassandra/spark/data/types/Int.java |   6 +-
 .../cassandra/spark/data/types/LongBased.java      |   6 +-
 .../cassandra/spark/data/types/SmallInt.java       |   6 +-
 .../cassandra/spark/data/types/StringBased.java    |   0
 .../apache/cassandra/spark/data/types/Text.java    |   0
 .../apache/cassandra/spark/data/types/Time.java    |   0
 .../cassandra/spark/data/types/TimeUUID.java       |   0
 .../cassandra/spark/data/types/Timestamp.java      |   3 +-
 .../apache/cassandra/spark/data/types/TinyInt.java |  11 +-
 .../apache/cassandra/spark/data/types/UUID.java    |   6 +-
 .../apache/cassandra/spark/data/types/VarChar.java |   0
 .../apache/cassandra/spark/data/types/VarInt.java  |   0
 .../cassandra/spark/reader/ComplexTypeBuffer.java  |   0
 .../apache/cassandra/spark/reader/ListBuffer.java  |   0
 .../apache/cassandra/spark/reader/MapBuffer.java   |   0
 .../apache/cassandra/spark/reader/SetBuffer.java   |   0
 .../apache/cassandra/spark/reader/UdtBuffer.java   |   0
 settings.gradle                                    |   1 +
 58 files changed, 762 insertions(+), 481 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 391e5fb..a30d3f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Split the Cassandra type logic out from CassandraBridge into a separate 
module (CASSANDRA-19793)
  * Remove other uses of Apache Commons lang for hashcode, equality and random 
string generation (CASSANDRA-19791)
  * Split out BufferingInputStream stats into separate interface 
(CASSANDRA-19778)
  * Bump Sidecar version to 55a9efee (CASSANDRA-19774)
diff --git a/cassandra-analytics-core/build.gradle 
b/cassandra-analytics-core/build.gradle
index 8e72d4d..145c326 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -53,6 +53,9 @@ configurations {
     fourzerobridge {
         description = 'Bridge to Cassandra 4.0'
     }
+    fourzerotypes {
+        description = 'Cassandra 4.0 Data Types'
+    }
 }
 
 dependencies {
@@ -61,6 +64,7 @@ dependencies {
     implementation project(":cassandra-analytics-common")
     fourzero(project(path: ':cassandra-four-zero', configuration: 'shadow'))
     fourzerobridge(project(path: ':cassandra-four-zero-bridge'))
+    fourzerotypes(project(path: ':cassandra-four-zero-types'))
 
     // Including newer JNA which works with "Modern" Linux/GLIBC.
     // End users can exclude this and include their own if necessary.
@@ -108,6 +112,7 @@ dependencies {
 jar {
     dependsOn(configurations.fourzero)
     dependsOn(configurations.fourzerobridge)
+    dependsOn(configurations.fourzerotypes)
 
     writeBuildVersion(version, project.projectDir)
 
@@ -115,6 +120,7 @@ jar {
     into('bridges') {
         from(configurations.fourzero.singleFile)
         from(configurations.fourzerobridge.singleFile)
+        from(configurations.fourzerotypes.singleFile)
     }
 }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
index 7e4d19e..56b8c3f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
@@ -121,7 +121,18 @@ public final class CassandraBridgeFactory
     @NotNull
     private static String bridgeResourceName(@NotNull String label)
     {
-        return "/bridges/" + label + "-bridge.jar";
+        return jarResourceName(label, "bridge");
+    }
+
+    @NotNull
+    private static String typesResourceName(@NotNull String label)
+    {
+        return jarResourceName(label, "types");
+    }
+
+    private static String jarResourceName(String... parts)
+    {
+        return "/bridges/" + String.join("-", parts) + ".jar";
     }
 
     @NotNull
@@ -140,7 +151,12 @@ public final class CassandraBridgeFactory
             File bridgeJar = Files.createTempFile(null, ".jar").toFile();
             FileUtils.copyInputStreamToFile(contents, bridgeJar);
 
-            URL[] urls = {casandraJar.toURI().toURL(), 
bridgeJar.toURI().toURL()};
+            name = typesResourceName(label);
+            contents = CassandraBridgeFactory.class.getResourceAsStream(name);
+            File typesJar = Files.createTempFile(null, ".jar").toFile();
+            FileUtils.copyInputStreamToFile(contents, typesJar);
+
+            URL[] urls = {casandraJar.toURI().toURL(), 
bridgeJar.toURI().toURL(), typesJar.toURI().toURL()};
             ClassLoader loader = new PostDelegationClassLoader(urls, 
Thread.currentThread().getContextClassLoader());
             Class<CassandraBridge> bridge = (Class<CassandraBridge>) 
loader.loadClass("org.apache.cassandra.bridge.CassandraBridgeImplementation");
             Constructor<CassandraBridge> constructor = bridge.getConstructor();
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
index b0ea97b..1f2c936 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
@@ -334,7 +334,7 @@ public class KryoSerializationTests
         udt.write(out);
         out.close();
         Input in = new Input(out.getBuffer(), 0, out.position());
-        CqlField.CqlUdt deserialized = (CqlField.CqlUdt) 
CqlField.CqlType.read(in, bridge);
+        CqlField.CqlUdt deserialized = (CqlField.CqlUdt) 
CqlField.CqlType.read(in, bridge.cassandraTypes());
         assertEquals(udt, deserialized);
         for (int index = 0; index < deserialized.fields().size(); index++)
         {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
index b574384..60266f0 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
@@ -169,7 +169,8 @@ public final class TestUtils
                                            .option("version", 
version.toString())
                                            .option("useBufferingInputStream", 
true)  // Use in the test system to test the BufferingInputStream
                                            .option("partitioner", 
partitioner.name())
-                                           .option("udts", udts.stream().map(f 
-> f.createStatement(bridge, keyspace)).collect(Collectors.joining("\n")));
+                                           .option("udts", udts.stream().map(f 
-> f.createStatement(bridge.cassandraTypes(), keyspace))
+                                                               
.collect(Collectors.joining("\n")));
         if (statsClass != null)
         {
             frameReader = frameReader.option("statsClass", statsClass);
@@ -209,7 +210,7 @@ public final class TestUtils
                                            .option("partitioner", 
partitioner.name())
                                            
.option(SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.optionName(), 
addLastModifiedTimestampColumn)
                                            .option("udts", udts.stream()
-                                                               .map(udt -> 
udt.createStatement(bridge, keyspace))
+                                                               .map(udt -> 
udt.createStatement(bridge.cassandraTypes(), keyspace))
                                                                
.collect(Collectors.joining("\n")));
         if (statsClass != null)
         {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CqlFieldTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CqlFieldTests.java
index af55773..af14c80 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CqlFieldTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CqlFieldTests.java
@@ -153,7 +153,7 @@ public class CqlFieldTests extends VersionRunner
     @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
     public void testTuple(CassandraBridge bridge)
     {
-        String[] result = CassandraBridge.splitInnerTypes("a, b, c, d,e, f, 
g");
+        String[] result = CassandraTypes.splitInnerTypes("a, b, c, d,e, f, g");
         assertEquals("a", result[0]);
         assertEquals("b", result[1]);
         assertEquals("c", result[2]);
@@ -165,7 +165,7 @@ public class CqlFieldTests extends VersionRunner
 
     private static void splitMap(String str, String left, String right)
     {
-        String[] result = CassandraBridge.splitInnerTypes(str);
+        String[] result = CassandraTypes.splitInnerTypes(str);
         if (left != null)
         {
             assertEquals(left, result[0]);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/SchemaTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/SchemaTests.java
index 17ab97f..e4156e1 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/SchemaTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/SchemaTests.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.spark.TestUtils;
+import org.apache.cassandra.spark.data.CassandraTypes;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.ReplicationFactor;
@@ -262,14 +263,14 @@ public class SchemaTests extends VersionRunner
             string = String.format(pattern, first, second);
         }
 
-        Matcher matcher = CassandraBridge.COLLECTION_PATTERN.matcher(string);
+        Matcher matcher = CassandraTypes.COLLECTION_PATTERN.matcher(string);
         assertEquals(collection != null && first != null, matcher.matches());
         if (matcher.matches())
         {
             assertNotNull(collection);
             assertNotNull(first);
             assertEquals(collection, matcher.group(1));
-            String[] types = CassandraBridge.splitInnerTypes(matcher.group(2));
+            String[] types = CassandraTypes.splitInnerTypes(matcher.group(2));
             assertEquals(first, bridge.nativeType(types[0].toUpperCase()));
             if (isMap)
             {
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 7558538..0c3b7d3 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -31,8 +31,6 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -41,10 +39,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
@@ -53,6 +48,7 @@ import com.google.common.collect.ImmutableMap;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
+import org.apache.cassandra.spark.data.CassandraTypes;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.ReplicationFactor;
@@ -78,8 +74,7 @@ public abstract class CassandraBridge
     @VisibleForTesting
     public static final Object UNSET_MARKER = new Object();
 
-    public static final Pattern COLLECTION_PATTERN = 
Pattern.compile("^(set|list|map|tuple)<(.+)>$", Pattern.CASE_INSENSITIVE);
-    public static final Pattern FROZEN_PATTERN = 
Pattern.compile("^frozen<(.*)>$", Pattern.CASE_INSENSITIVE);
+    public abstract CassandraTypes cassandraTypes();
 
     public abstract AbstractMap.SimpleEntry<ByteBuffer, BigInteger> 
getPartitionKey(@NotNull CqlTable table,
                                                                                
     @NotNull Partitioner partitioner,
@@ -163,30 +158,27 @@ public abstract class CassandraBridge
      * @param identifier the identifier
      * @return the quoted identifier when the input is mixed case or a 
reserved word, the original input otherwise
      */
-    public abstract String maybeQuoteIdentifier(String identifier);
-
-    protected boolean isAlreadyQuoted(String identifier)
+    public String maybeQuoteIdentifier(String identifier)
     {
-        if (identifier != null && identifier.length() > 1)
-        {
-            return identifier.charAt(0) == '"' &&
-                   identifier.charAt(identifier.length() - 1) == '"';
-        }
-        return false;
+        return cassandraTypes().maybeQuoteIdentifier(identifier);
     }
 
     // CQL Type Parsing
 
-    public abstract CqlField.CqlType readType(CqlField.CqlType.InternalType 
type, Input input);
+    public CqlField.CqlType readType(CqlField.CqlType.InternalType type, Input 
input)
+    {
+        return cassandraTypes().readType(type, input);
+    }
 
     public List<CqlField.NativeType> allTypes()
     {
-        return Arrays.asList(ascii(), bigint(), blob(), bool(), counter(), 
date(), decimal(), aDouble(),
-                             duration(), empty(), aFloat(), inet(), aInt(), 
smallint(), text(), time(),
-                             timestamp(), timeuuid(), tinyint(), uuid(), 
varchar(), varint());
+        return cassandraTypes().allTypes();
     }
 
-    public abstract Map<String, ? extends CqlField.NativeType> 
nativeTypeNames();
+    public Map<String, ? extends CqlField.NativeType> nativeTypeNames()
+    {
+        return cassandraTypes().nativeTypeNames();
+    }
 
     public CqlField.NativeType nativeType(String name)
     {
@@ -200,65 +192,152 @@ public abstract class CassandraBridge
 
     // Native
 
-    public abstract CqlField.NativeType ascii();
+    public CqlField.NativeType ascii()
+    {
+        return cassandraTypes().ascii();
+    }
 
-    public abstract CqlField.NativeType blob();
+    public CqlField.NativeType blob()
+    {
+        return cassandraTypes().blob();
+    }
 
-    public abstract CqlField.NativeType bool();
+    public CqlField.NativeType bool()
+    {
+        return cassandraTypes().bool();
+    }
 
-    public abstract CqlField.NativeType counter();
+    public CqlField.NativeType counter()
+    {
+        return cassandraTypes().counter();
+    }
 
-    public abstract CqlField.NativeType bigint();
+    public CqlField.NativeType bigint()
+    {
+        return cassandraTypes().bigint();
+    }
 
-    public abstract CqlField.NativeType date();
+    public CqlField.NativeType date()
+    {
+        return cassandraTypes().date();
+    }
 
-    public abstract CqlField.NativeType decimal();
+    public CqlField.NativeType decimal()
+    {
+        return cassandraTypes().decimal();
+    }
 
-    public abstract CqlField.NativeType aDouble();
+    public CqlField.NativeType aDouble()
+    {
+        return cassandraTypes().aDouble();
+    }
 
-    public abstract CqlField.NativeType duration();
+    public CqlField.NativeType duration()
+    {
+        return cassandraTypes().duration();
+    }
 
-    public abstract CqlField.NativeType empty();
+    public CqlField.NativeType empty()
+    {
+        return cassandraTypes().empty();
+    }
 
-    public abstract CqlField.NativeType aFloat();
+    public CqlField.NativeType aFloat()
+    {
+        return cassandraTypes().aFloat();
+    }
 
-    public abstract CqlField.NativeType inet();
+    public CqlField.NativeType inet()
+    {
+        return cassandraTypes().inet();
+    }
 
-    public abstract CqlField.NativeType aInt();
+    public CqlField.NativeType aInt()
+    {
+        return cassandraTypes().aInt();
+    }
 
-    public abstract CqlField.NativeType smallint();
+    public CqlField.NativeType smallint()
+    {
+        return cassandraTypes().smallint();
+    }
 
-    public abstract CqlField.NativeType text();
+    public CqlField.NativeType text()
+    {
+        return cassandraTypes().text();
+    }
 
-    public abstract CqlField.NativeType time();
+    public CqlField.NativeType time()
+    {
+        return cassandraTypes().time();
+    }
 
-    public abstract CqlField.NativeType timestamp();
+    public CqlField.NativeType timestamp()
+    {
+        return cassandraTypes().timestamp();
+    }
 
-    public abstract CqlField.NativeType timeuuid();
+    public CqlField.NativeType timeuuid()
+    {
+        return cassandraTypes().timeuuid();
+    }
 
-    public abstract CqlField.NativeType tinyint();
+    public CqlField.NativeType tinyint()
+    {
+        return cassandraTypes().tinyint();
+    }
 
-    public abstract CqlField.NativeType uuid();
+    public CqlField.NativeType uuid()
+    {
+        return cassandraTypes().uuid();
+    }
 
-    public abstract CqlField.NativeType varchar();
+    public CqlField.NativeType varchar()
+    {
+        return cassandraTypes().varchar();
+    }
 
-    public abstract CqlField.NativeType varint();
+    public CqlField.NativeType varint()
+    {
+        return cassandraTypes().varint();
+    }
 
     // Complex
 
-    public abstract CqlField.CqlType collection(String name, 
CqlField.CqlType... types);
+    public CqlField.CqlType collection(String name, CqlField.CqlType... types)
+    {
+        return cassandraTypes().collection(name, types);
+    }
 
-    public abstract CqlField.CqlList list(CqlField.CqlType type);
+    public CqlField.CqlList list(CqlField.CqlType type)
+    {
+        return cassandraTypes().list(type);
+    }
 
-    public abstract CqlField.CqlSet set(CqlField.CqlType type);
+    public CqlField.CqlSet set(CqlField.CqlType type)
+    {
+        return cassandraTypes().set(type);
+    }
 
-    public abstract CqlField.CqlMap map(CqlField.CqlType keyType, 
CqlField.CqlType valueType);
+    public CqlField.CqlMap map(CqlField.CqlType keyType, CqlField.CqlType 
valueType)
+    {
+        return cassandraTypes().map(keyType, valueType);
+    }
 
-    public abstract CqlField.CqlTuple tuple(CqlField.CqlType... types);
+    public CqlField.CqlTuple tuple(CqlField.CqlType... types)
+    {
+        return cassandraTypes().tuple(types);
+    }
 
-    public abstract CqlField.CqlType frozen(CqlField.CqlType type);
+    public CqlField.CqlType frozen(CqlField.CqlType type)
+    {
+        return cassandraTypes().frozen(type);
+    }
 
-    public abstract CqlField.CqlUdtBuilder udt(String keyspace, String name);
+    public CqlField.CqlUdtBuilder udt(String keyspace, String name)
+    {
+        return cassandraTypes().udt(keyspace, name);
+    }
 
     public CqlField.CqlType parseType(String type)
     {
@@ -267,82 +346,7 @@ public abstract class CassandraBridge
 
     public CqlField.CqlType parseType(String type, Map<String, 
CqlField.CqlUdt> udts)
     {
-        if (type == null || type.length() == 0)
-        {
-            return null;
-        }
-        Matcher collectionMatcher = COLLECTION_PATTERN.matcher(type);
-        if (collectionMatcher.find())
-        {
-            // CQL collection
-            String[] types = splitInnerTypes(collectionMatcher.group(2));
-            return collection(collectionMatcher.group(1), Stream.of(types)
-                                                                
.map(collectionType -> parseType(collectionType, udts))
-                                                                
.toArray(CqlField.CqlType[]::new));
-        }
-        Matcher frozenMatcher = FROZEN_PATTERN.matcher(type);
-        if (frozenMatcher.find())
-        {
-            // Frozen collections
-            return frozen(parseType(frozenMatcher.group(1), udts));
-        }
-
-        if (udts.containsKey(type))
-        {
-            // User-defined type
-            return udts.get(type);
-        }
-
-        // Native CQL 3 type
-        return nativeType(type);
-    }
-
-    @VisibleForTesting
-    public static String[] splitInnerTypes(String str)
-    {
-        List<String> result = new ArrayList<>();
-        StringBuilder current = new StringBuilder();
-        int parentheses = 0;
-        for (int index = 0; index < str.length(); index++)
-        {
-            char character = str.charAt(index);
-            switch (character)
-            {
-                case ' ':
-                    if (parentheses == 0)
-                    {
-                        continue;
-                    }
-                    break;
-                case ',':
-                    if (parentheses == 0)
-                    {
-                        if (current.length() > 0)
-                        {
-                            result.add(current.toString());
-                            current = new StringBuilder();
-                        }
-                        continue;
-                    }
-                    break;
-                case '<':
-                    parentheses++;
-                    break;
-                case '>':
-                    parentheses--;
-                    break;
-                default:
-                    // Do nothing
-            }
-            current.append(character);
-        }
-
-        if (current.length() > 0 || result.isEmpty())
-        {
-            result.add(current.toString());
-        }
-
-        return result.toArray(new String[0]);
+        return cassandraTypes().parseType(type, udts);
     }
 
     // SSTable Writer
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java
new file mode 100644
index 0000000..cd776f9
--- /dev/null
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.esotericsoftware.kryo.io.Input;
+
+public abstract class CassandraTypes
+{
+    public static final Pattern COLLECTION_PATTERN = 
Pattern.compile("^(set|list|map|tuple)<(.+)>$", Pattern.CASE_INSENSITIVE);
+    public static final Pattern FROZEN_PATTERN = 
Pattern.compile("^frozen<(.*)>$", Pattern.CASE_INSENSITIVE);
+
+    /**
+     * Returns the quoted identifier, if the {@code identifier} has mixed case 
or if the {@code identifier}
+     * is a reserved word.
+     *
+     * @param identifier the identifier
+     * @return the quoted identifier when the input is mixed case or a 
reserved word, the original input otherwise
+     */
+    public abstract String maybeQuoteIdentifier(String identifier);
+
+    public static boolean isAlreadyQuoted(String identifier)
+    {
+        if (identifier != null && identifier.length() > 1)
+        {
+            return identifier.charAt(0) == '"' &&
+                   identifier.charAt(identifier.length() - 1) == '"';
+        }
+        return false;
+    }
+
+    public abstract CqlField.CqlType readType(CqlField.CqlType.InternalType 
type, Input input);
+
+    public List<CqlField.NativeType> allTypes()
+    {
+        return Arrays.asList(ascii(), bigint(), blob(), bool(), counter(), 
date(), decimal(), aDouble(),
+                             duration(), empty(), aFloat(), inet(), aInt(), 
smallint(), text(), time(),
+                             timestamp(), timeuuid(), tinyint(), uuid(), 
varchar(), varint());
+    }
+
+    public abstract Map<String, ? extends CqlField.NativeType> 
nativeTypeNames();
+
+    public CqlField.NativeType nativeType(String name)
+    {
+        return nativeTypeNames().get(name.toLowerCase());
+    }
+
+    public List<CqlField.NativeType> supportedTypes()
+    {
+        return 
allTypes().stream().filter(CqlField.NativeType::isSupported).collect(Collectors.toList());
+    }
+
+    // Native
+
+    public abstract CqlField.NativeType ascii();
+
+    public abstract CqlField.NativeType blob();
+
+    public abstract CqlField.NativeType bool();
+
+    public abstract CqlField.NativeType counter();
+
+    public abstract CqlField.NativeType bigint();
+
+    public abstract CqlField.NativeType date();
+
+    public abstract CqlField.NativeType decimal();
+
+    public abstract CqlField.NativeType aDouble();
+
+    public abstract CqlField.NativeType duration();
+
+    public abstract CqlField.NativeType empty();
+
+    public abstract CqlField.NativeType aFloat();
+
+    public abstract CqlField.NativeType inet();
+
+    public abstract CqlField.NativeType aInt();
+
+    public abstract CqlField.NativeType smallint();
+
+    public abstract CqlField.NativeType text();
+
+    public abstract CqlField.NativeType time();
+
+    public abstract CqlField.NativeType timestamp();
+
+    public abstract CqlField.NativeType timeuuid();
+
+    public abstract CqlField.NativeType tinyint();
+
+    public abstract CqlField.NativeType uuid();
+
+    public abstract CqlField.NativeType varchar();
+
+    public abstract CqlField.NativeType varint();
+
+    // Complex
+
+    public abstract CqlField.CqlType collection(String name, 
CqlField.CqlType... types);
+
+    public abstract CqlField.CqlList list(CqlField.CqlType type);
+
+    public abstract CqlField.CqlSet set(CqlField.CqlType type);
+
+    public abstract CqlField.CqlMap map(CqlField.CqlType keyType, 
CqlField.CqlType valueType);
+
+    public abstract CqlField.CqlTuple tuple(CqlField.CqlType... types);
+
+    public abstract CqlField.CqlType frozen(CqlField.CqlType type);
+
+    public abstract CqlField.CqlUdtBuilder udt(String keyspace, String name);
+
+    public CqlField.CqlType parseType(String type)
+    {
+        return parseType(type, Collections.emptyMap());
+    }
+
+    public CqlField.CqlType parseType(String type, Map<String, 
CqlField.CqlUdt> udts)
+    {
+        if (type == null || type.length() == 0)
+        {
+            return null;
+        }
+        Matcher collectionMatcher = COLLECTION_PATTERN.matcher(type);
+        if (collectionMatcher.find())
+        {
+            // CQL collection
+            String[] types = splitInnerTypes(collectionMatcher.group(2));
+            return collection(collectionMatcher.group(1), Stream.of(types)
+                                                                
.map(collectionType -> parseType(collectionType, udts))
+                                                                
.toArray(CqlField.CqlType[]::new));
+        }
+        Matcher frozenMatcher = FROZEN_PATTERN.matcher(type);
+        if (frozenMatcher.find())
+        {
+            // Frozen collections
+            return frozen(parseType(frozenMatcher.group(1), udts));
+        }
+
+        if (udts.containsKey(type))
+        {
+            // User-defined type
+            return udts.get(type);
+        }
+
+        // Native CQL 3 type
+        return nativeType(type);
+    }
+
+    @VisibleForTesting
+    public static String[] splitInnerTypes(String str)
+    {
+        List<String> result = new ArrayList<>();
+        StringBuilder current = new StringBuilder();
+        int parentheses = 0;
+        for (int index = 0; index < str.length(); index++)
+        {
+            char character = str.charAt(index);
+            switch (character)
+            {
+                case ' ':
+                    if (parentheses == 0)
+                    {
+                        continue;
+                    }
+                    break;
+                case ',':
+                    if (parentheses == 0)
+                    {
+                        if (current.length() > 0)
+                        {
+                            result.add(current.toString());
+                            current = new StringBuilder();
+                        }
+                        continue;
+                    }
+                    break;
+                case '<':
+                    parentheses++;
+                    break;
+                case '>':
+                    parentheses--;
+                    break;
+                default:
+                    // Do nothing
+            }
+            current.append(character);
+        }
+
+        if (current.length() > 0 || result.isEmpty())
+        {
+            result.add(current.toString());
+        }
+
+        return result.toArray(new String[0]);
+    }
+}
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CqlField.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CqlField.java
index 1488fa4..8af6169 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CqlField.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CqlField.java
@@ -30,12 +30,12 @@ import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.cassandra.bridge.BigNumberConfig;
-import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -48,6 +48,22 @@ public class CqlField implements Serializable, 
Comparable<CqlField>
 {
     private static final long serialVersionUID = 42L;
 
+    public static final Comparator<Byte> BYTE_COMPARATOR = 
CqlField::compareBytes;
+    public static final Comparator<Long> LONG_COMPARATOR = Long::compareTo;
+    public static final Comparator<Integer> INTEGER_COMPARATOR = 
Integer::compareTo;
+    public static final Comparator<byte[]> BYTE_ARRAY_COMPARATOR = 
UnsignedBytes.lexicographicalComparator();
+    public static final Comparator<Boolean> BOOLEAN_COMPARATOR = 
Boolean::compareTo;
+    public static final Comparator<Double> DOUBLE_COMPARATOR = 
Double::compareTo;
+    public static final Comparator<Void> VOID_COMPARATOR_COMPARATOR = (first, 
second) -> 0;
+    public static final Comparator<Float> FLOAT_COMPARATOR = Float::compareTo;
+    public static final Comparator<Short> SHORT_COMPARATOR = Short::compare;
+    public static final Comparator<String> UUID_COMPARATOR = 
Comparator.comparing(java.util.UUID::fromString);
+
+    private static int compareBytes(byte first, byte second)
+    {
+        return first - second;  // Safe because of the range being restricted
+    }
+
     public interface CqlType extends Serializable, Comparator<Object>
     {
         enum InternalType
@@ -156,10 +172,10 @@ public class CqlField implements Serializable, 
Comparable<CqlField>
             out.writeInt(type.internalType().ordinal());
         }
 
-        static CqlType read(Input input, CassandraBridge bridge)
+        static CqlType read(Input input, CassandraTypes cassandraTypes)
         {
             InternalType internalType = InternalType.values()[input.readInt()];
-            return bridge.readType(internalType, input);
+            return cassandraTypes.readType(internalType, input);
         }
     }
 
@@ -218,7 +234,7 @@ public class CqlField implements Serializable, 
Comparable<CqlField>
     {
         CqlFrozen frozen();
 
-        String createStatement(CassandraBridge bridge, String keyspace);
+        String createStatement(CassandraTypes cassandraTypes, String keyspace);
 
         String keyspace();
 
@@ -425,11 +441,11 @@ public class CqlField implements Serializable, 
Comparable<CqlField>
 
     public static class Serializer extends 
com.esotericsoftware.kryo.Serializer<CqlField>
     {
-        private final CassandraBridge bridge;
+        private final CassandraTypes cassandraTypes;
 
-        public Serializer(CassandraBridge bridge)
+        public Serializer(CassandraTypes cassandraTypes)
         {
-            this.bridge = bridge;
+            this.cassandraTypes = cassandraTypes;
         }
 
         @Override
@@ -439,7 +455,7 @@ public class CqlField implements Serializable, 
Comparable<CqlField>
                                 input.readBoolean(),
                                 input.readBoolean(),
                                 input.readString(),
-                                CqlType.read(input, bridge),
+                                CqlType.read(input, cassandraTypes),
                                 input.readInt());
         }
 
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CqlTable.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CqlTable.java
index 406c417..64f3944 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CqlTable.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/CqlTable.java
@@ -34,7 +34,6 @@ import java.util.stream.Collectors;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import org.apache.cassandra.bridge.CassandraBridge;
 import org.jetbrains.annotations.NotNull;
 
 @SuppressWarnings({ "WeakerAccess", "unused" })
@@ -200,10 +199,10 @@ public class CqlTable implements Serializable
         return udts;
     }
 
-    public Set<String> udtCreateStmts(CassandraBridge bridge)
+    public Set<String> udtCreateStmts(CassandraTypes cassandraTypes)
     {
         return udts.stream()
-                   .map(udt -> udt.createStatement(bridge, keyspace))
+                   .map(udt -> udt.createStatement(cassandraTypes, keyspace))
                    .collect(Collectors.toSet());
     }
 
@@ -264,11 +263,11 @@ public class CqlTable implements Serializable
 
     public static class Serializer extends 
com.esotericsoftware.kryo.Serializer<CqlTable>
     {
-        private final CassandraBridge bridge;
+        private final CassandraTypes cassandraTypes;
 
-        public Serializer(CassandraBridge bridge)
+        public Serializer(CassandraTypes cassandraTypes)
         {
-            this.bridge = bridge;
+            this.cassandraTypes = cassandraTypes;
         }
 
         @Override
@@ -288,7 +287,7 @@ public class CqlTable implements Serializable
             Set<CqlField.CqlUdt> udts = new LinkedHashSet<>(numUdts);
             for (int udt = 0; udt < numUdts; udt++)
             {
-                udts.add((CqlField.CqlUdt) CqlField.CqlType.read(input, 
bridge));
+                udts.add((CqlField.CqlUdt) CqlField.CqlType.read(input, 
cassandraTypes));
             }
             int indexCount = input.readInt();
             return new CqlTable(keyspace, table, createStatement, 
replicationFactor, fields, udts, indexCount);
diff --git a/cassandra-four-zero-bridge/build.gradle 
b/cassandra-four-zero-bridge/build.gradle
index fc8ddb4..b303159 100644
--- a/cassandra-four-zero-bridge/build.gradle
+++ b/cassandra-four-zero-bridge/build.gradle
@@ -29,6 +29,8 @@ configurations {
 }
 
 dependencies {
+    compileOnly project(":cassandra-analytics-common")
+    compileOnly project(":cassandra-four-zero-types")
     compileOnly(project(':cassandra-bridge'))
 
     compileOnly(group: "${sparkGroupId}", name: 
"spark-core_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
@@ -36,6 +38,8 @@ dependencies {
 
     compileOnly(project(path: ':cassandra-four-zero', configuration: 'shadow'))
 
+    testImplementation project(":cassandra-analytics-common")
+    testImplementation project(":cassandra-four-zero-types")
     testImplementation(project(':cassandra-bridge'))
     testImplementation(project(path: ':cassandra-four-zero', configuration: 
'shadow'))
     
testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}")
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 76b9319..6f0d6c2 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -48,11 +48,9 @@ import com.google.common.base.Preconditions;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.commitlog.CommitLogSegmentManagerStandard;
@@ -74,41 +72,16 @@ import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.spark.data.CassandraTypes;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.CqlType;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.data.SSTablesSupplier;
-import org.apache.cassandra.spark.data.complex.CqlCollection;
-import org.apache.cassandra.spark.data.complex.CqlFrozen;
-import org.apache.cassandra.spark.data.complex.CqlList;
-import org.apache.cassandra.spark.data.complex.CqlMap;
-import org.apache.cassandra.spark.data.complex.CqlSet;
 import org.apache.cassandra.spark.data.complex.CqlTuple;
 import org.apache.cassandra.spark.data.complex.CqlUdt;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
-import org.apache.cassandra.spark.data.types.Ascii;
-import org.apache.cassandra.spark.data.types.BigInt;
-import org.apache.cassandra.spark.data.types.Blob;
-import org.apache.cassandra.spark.data.types.Boolean;
-import org.apache.cassandra.spark.data.types.Counter;
-import org.apache.cassandra.spark.data.types.Date;
-import org.apache.cassandra.spark.data.types.Decimal;
-import org.apache.cassandra.spark.data.types.Double;
-import org.apache.cassandra.spark.data.types.Duration;
-import org.apache.cassandra.spark.data.types.Empty;
-import org.apache.cassandra.spark.data.types.Float;
-import org.apache.cassandra.spark.data.types.Inet;
-import org.apache.cassandra.spark.data.types.Int;
-import org.apache.cassandra.spark.data.types.SmallInt;
-import org.apache.cassandra.spark.data.types.Text;
-import org.apache.cassandra.spark.data.types.Time;
-import org.apache.cassandra.spark.data.types.TimeUUID;
-import org.apache.cassandra.spark.data.types.Timestamp;
-import org.apache.cassandra.spark.data.types.TinyInt;
-import org.apache.cassandra.spark.data.types.VarChar;
-import org.apache.cassandra.spark.data.types.VarInt;
 import org.apache.cassandra.spark.reader.CompactionStreamScanner;
 import org.apache.cassandra.spark.reader.IndexEntry;
 import org.apache.cassandra.spark.reader.IndexReader;
@@ -200,13 +173,19 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
     {
         // Cassandra-version-specific Kryo serializers
         kryoSerializers = new LinkedHashMap<>();
-        kryoSerializers.put(CqlField.class, new CqlField.Serializer(this));
-        kryoSerializers.put(CqlTable.class, new CqlTable.Serializer(this));
-        kryoSerializers.put(CqlUdt.class, new CqlUdt.Serializer(this));
+        kryoSerializers.put(CqlField.class, new 
CqlField.Serializer(cassandraTypes()));
+        kryoSerializers.put(CqlTable.class, new 
CqlTable.Serializer(cassandraTypes()));
+        kryoSerializers.put(CqlUdt.class, new 
CqlUdt.Serializer(cassandraTypes()));
 
         nativeTypes = 
allTypes().stream().collect(Collectors.toMap(CqlField.CqlType::name, 
Function.identity()));
     }
 
+    @Override
+    public CassandraTypes cassandraTypes()
+    {
+        return CassandraTypesImplementation.INSTANCE;
+    }
+
     @Override
     public AbstractMap.SimpleEntry<ByteBuffer, BigInteger> 
getPartitionKey(@NotNull CqlTable table,
                                                                            
@NotNull Partitioner partitioner,
@@ -327,220 +306,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                 @Nullable UUID tableId,
                                 int indexCount)
     {
-        return new SchemaBuilder(createStatement, keyspace, replicationFactor, 
partitioner, bridge -> udts, tableId, indexCount).build();
-    }
-
-    @Override
-    public String maybeQuoteIdentifier(String identifier)
-    {
-        if (isAlreadyQuoted(identifier))
-        {
-            return identifier;
-        }
-        return ColumnIdentifier.maybeQuote(identifier);
-    }
-
-    // CQL Type Parser
-
-    @Override
-    public Map<String, ? extends CqlField.NativeType> nativeTypeNames()
-    {
-        return nativeTypes;
-    }
-
-    @Override
-    public CqlField.CqlType readType(CqlField.CqlType.InternalType type, Input 
input)
-    {
-        switch (type)
-        {
-            case NativeCql:
-                return nativeType(input.readString());
-            case Set:
-            case List:
-            case Map:
-            case Tuple:
-                return CqlCollection.read(type, input, this);
-            case Frozen:
-                return CqlFrozen.build(CqlField.CqlType.read(input, this));
-            case Udt:
-                return CqlUdt.read(input, this);
-            default:
-                throw new IllegalStateException("Unknown CQL type, cannot 
deserialize");
-        }
-    }
-
-    @Override
-    public Ascii ascii()
-    {
-        return Ascii.INSTANCE;
-    }
-
-    @Override
-    public Blob blob()
-    {
-        return Blob.INSTANCE;
-    }
-
-    @Override
-    public Boolean bool()
-    {
-        return Boolean.INSTANCE;
-    }
-
-    @Override
-    public Counter counter()
-    {
-        return Counter.INSTANCE;
-    }
-
-    @Override
-    public BigInt bigint()
-    {
-        return BigInt.INSTANCE;
-    }
-
-    @Override
-    public Date date()
-    {
-        return Date.INSTANCE;
-    }
-
-    @Override
-    public Decimal decimal()
-    {
-        return Decimal.INSTANCE;
-    }
-
-    @Override
-    public Double aDouble()
-    {
-        return Double.INSTANCE;
-    }
-
-    @Override
-    public Duration duration()
-    {
-        return Duration.INSTANCE;
-    }
-
-    @Override
-    public Empty empty()
-    {
-        return Empty.INSTANCE;
-    }
-
-    @Override
-    public Float aFloat()
-    {
-        return Float.INSTANCE;
-    }
-
-    @Override
-    public Inet inet()
-    {
-        return Inet.INSTANCE;
-    }
-
-    @Override
-    public Int aInt()
-    {
-        return Int.INSTANCE;
-    }
-
-    @Override
-    public SmallInt smallint()
-    {
-        return SmallInt.INSTANCE;
-    }
-
-    @Override
-    public Text text()
-    {
-        return Text.INSTANCE;
-    }
-
-    @Override
-    public Time time()
-    {
-        return Time.INSTANCE;
-    }
-
-    @Override
-    public Timestamp timestamp()
-    {
-        return Timestamp.INSTANCE;
-    }
-
-    @Override
-    public TimeUUID timeuuid()
-    {
-        return TimeUUID.INSTANCE;
-    }
-
-    @Override
-    public TinyInt tinyint()
-    {
-        return TinyInt.INSTANCE;
-    }
-
-    @Override
-    public org.apache.cassandra.spark.data.types.UUID uuid()
-    {
-        return org.apache.cassandra.spark.data.types.UUID.INSTANCE;
-    }
-
-    @Override
-    public VarChar varchar()
-    {
-        return VarChar.INSTANCE;
-    }
-
-    @Override
-    public VarInt varint()
-    {
-        return VarInt.INSTANCE;
-    }
-
-    @Override
-    public CqlField.CqlType collection(String name, CqlField.CqlType... types)
-    {
-        return CqlCollection.build(name, types);
-    }
-
-    @Override
-    public CqlList list(CqlField.CqlType type)
-    {
-        return CqlCollection.list(type);
-    }
-
-    @Override
-    public CqlSet set(CqlField.CqlType type)
-    {
-        return CqlCollection.set(type);
-    }
-
-    @Override
-    public CqlMap map(CqlField.CqlType keyType, CqlField.CqlType valueType)
-    {
-        return CqlCollection.map(keyType, valueType);
-    }
-
-    @Override
-    public CqlTuple tuple(CqlField.CqlType... types)
-    {
-        return CqlCollection.tuple(types);
-    }
-
-    @Override
-    public CqlField.CqlType frozen(CqlField.CqlType type)
-    {
-        return CqlFrozen.build(type);
-    }
-
-    @Override
-    public CqlField.CqlUdtBuilder udt(String keyspace, String name)
-    {
-        return CqlUdt.builder(keyspace, name);
+        return new SchemaBuilder(createStatement, keyspace, replicationFactor, 
partitioner, cassandraTypes -> udts, tableId, indexCount).build();
     }
 
     @Override
@@ -565,7 +331,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
         for (CqlField.CqlUdt udt : udts)
         {
             // Add user-defined types to CQL writer
-            String statement = udt.createStatement(this, keyspace);
+            String statement = udt.createStatement(cassandraTypes(), keyspace);
             builder.withType(statement);
         }
 
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java
index 7535030..ffd6a9b 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.spark.data.CassandraTypes;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.ReplicationFactor;
@@ -119,7 +120,7 @@ public class SchemaBuilder
                          String keyspace,
                          ReplicationFactor replicationFactor,
                          Partitioner partitioner,
-                         Function<CassandraBridge, Set<String>> 
udtStatementsProvider,
+                         Function<CassandraTypes, Set<String>> 
udtStatementsProvider,
                          @Nullable UUID tableId,
                          int indexCount)
     {
@@ -132,7 +133,7 @@ public class SchemaBuilder
         Pair<KeyspaceMetadata, TableMetadata> updated = 
CassandraSchema.apply(schema ->
                 updateSchema(schema,
                              this.keyspace,
-                             udtStatementsProvider.apply(bridge),
+                             
udtStatementsProvider.apply(bridge.cassandraTypes()),
                              this.createStmt,
                              partitioner,
                              this.replicationFactor,
diff --git a/cassandra-four-zero-bridge/build.gradle 
b/cassandra-four-zero-types/build.gradle
similarity index 57%
copy from cassandra-four-zero-bridge/build.gradle
copy to cassandra-four-zero-types/build.gradle
index fc8ddb4..2f4276f 100644
--- a/cassandra-four-zero-bridge/build.gradle
+++ b/cassandra-four-zero-types/build.gradle
@@ -35,30 +35,16 @@ dependencies {
     compileOnly(group: "${sparkGroupId}", name: 
"spark-sql_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
 
     compileOnly(project(path: ':cassandra-four-zero', configuration: 'shadow'))
-
-    testImplementation(project(':cassandra-bridge'))
-    testImplementation(project(path: ':cassandra-four-zero', configuration: 
'shadow'))
-    
testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}")
-    
testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}")
-    
testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}")
-    testImplementation(group: 'org.quicktheories', name: 'quicktheories', 
version: "${project.rootProject.quickTheoriesVersion}")
-    
testImplementation("org.mockito:mockito-core:${project.rootProject.mockitoVersion}")
-    testImplementation(group: "${sparkGroupId}", name: 
"spark-core_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
-    testImplementation(group: "${sparkGroupId}", name: 
"spark-sql_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
-    testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: 
'1.5.0-4')
-
-    testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: 
"${jnaVersion}")
-    testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: 
"${jnaVersion}")
 }
 
 jar {
-    archiveFileName = "four-zero-bridge.jar"
+    archiveFileName = "four-zero-types.jar"
 }
 
 test {
     useJUnitPlatform()
     reports {
-        def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", 
"test-reports", "four-zero-bridge").toFile()
+        def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", 
"test-reports", "four-zero-types").toFile()
         junitXml {
             enabled true
             destination = destDir
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java
new file mode 100644
index 0000000..9bf5d32
--- /dev/null
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.esotericsoftware.kryo.io.Input;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.spark.data.CassandraTypes;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.complex.CqlCollection;
+import org.apache.cassandra.spark.data.complex.CqlFrozen;
+import org.apache.cassandra.spark.data.complex.CqlUdt;
+import org.apache.cassandra.spark.data.types.Ascii;
+import org.apache.cassandra.spark.data.types.BigInt;
+import org.apache.cassandra.spark.data.types.Blob;
+import org.apache.cassandra.spark.data.types.Boolean;
+import org.apache.cassandra.spark.data.types.Counter;
+import org.apache.cassandra.spark.data.types.Date;
+import org.apache.cassandra.spark.data.types.Decimal;
+import org.apache.cassandra.spark.data.types.Double;
+import org.apache.cassandra.spark.data.types.Duration;
+import org.apache.cassandra.spark.data.types.Empty;
+import org.apache.cassandra.spark.data.types.Float;
+import org.apache.cassandra.spark.data.types.Inet;
+import org.apache.cassandra.spark.data.types.Int;
+import org.apache.cassandra.spark.data.types.SmallInt;
+import org.apache.cassandra.spark.data.types.Text;
+import org.apache.cassandra.spark.data.types.Time;
+import org.apache.cassandra.spark.data.types.TimeUUID;
+import org.apache.cassandra.spark.data.types.Timestamp;
+import org.apache.cassandra.spark.data.types.TinyInt;
+import org.apache.cassandra.spark.data.types.VarChar;
+import org.apache.cassandra.spark.data.types.VarInt;
+
+public class CassandraTypesImplementation extends CassandraTypes
+{
+
+    public static final CassandraTypesImplementation INSTANCE = new 
CassandraTypesImplementation();
+
+    private final Map<String, CqlField.NativeType> nativeTypes;
+
+    public CassandraTypesImplementation()
+    {
+        nativeTypes = 
allTypes().stream().collect(Collectors.toMap(CqlField.CqlType::name, 
Function.identity()));
+    }
+
+    @Override
+    public String maybeQuoteIdentifier(String identifier)
+    {
+        if (isAlreadyQuoted(identifier))
+        {
+            return identifier;
+        }
+        return ColumnIdentifier.maybeQuote(identifier);
+    }
+
+    @Override
+    public Map<String, ? extends CqlField.NativeType> nativeTypeNames()
+    {
+        return nativeTypes;
+    }
+
+    @Override
+    public CqlField.CqlType readType(CqlField.CqlType.InternalType type, Input 
input)
+    {
+        switch (type)
+        {
+            case NativeCql:
+                return nativeType(input.readString());
+            case Set:
+            case List:
+            case Map:
+            case Tuple:
+                return CqlCollection.read(type, input, this);
+            case Frozen:
+                return CqlFrozen.build(CqlField.CqlType.read(input, this));
+            case Udt:
+                return CqlUdt.read(input, this);
+            default:
+                throw new IllegalStateException("Unknown CQL type, cannot 
deserialize");
+        }
+    }
+
+    @Override
+    public Ascii ascii()
+    {
+        return Ascii.INSTANCE;
+    }
+
+    @Override
+    public Blob blob()
+    {
+        return Blob.INSTANCE;
+    }
+
+    @Override
+    public Boolean bool()
+    {
+        return Boolean.INSTANCE;
+    }
+
+    @Override
+    public Counter counter()
+    {
+        return Counter.INSTANCE;
+    }
+
+    @Override
+    public BigInt bigint()
+    {
+        return BigInt.INSTANCE;
+    }
+
+    @Override
+    public Date date()
+    {
+        return Date.INSTANCE;
+    }
+
+    @Override
+    public Decimal decimal()
+    {
+        return Decimal.INSTANCE;
+    }
+
+    @Override
+    public Double aDouble()
+    {
+        return Double.INSTANCE;
+    }
+
+    @Override
+    public Duration duration()
+    {
+        return Duration.INSTANCE;
+    }
+
+    @Override
+    public Empty empty()
+    {
+        return Empty.INSTANCE;
+    }
+
+    @Override
+    public Float aFloat()
+    {
+        return Float.INSTANCE;
+    }
+
+    @Override
+    public Inet inet()
+    {
+        return Inet.INSTANCE;
+    }
+
+    @Override
+    public Int aInt()
+    {
+        return Int.INSTANCE;
+    }
+
+    @Override
+    public SmallInt smallint()
+    {
+        return SmallInt.INSTANCE;
+    }
+
+    @Override
+    public Text text()
+    {
+        return Text.INSTANCE;
+    }
+
+    @Override
+    public Time time()
+    {
+        return Time.INSTANCE;
+    }
+
+    @Override
+    public Timestamp timestamp()
+    {
+        return Timestamp.INSTANCE;
+    }
+
+    @Override
+    public TimeUUID timeuuid()
+    {
+        return TimeUUID.INSTANCE;
+    }
+
+    @Override
+    public TinyInt tinyint()
+    {
+        return TinyInt.INSTANCE;
+    }
+
+    @Override
+    public org.apache.cassandra.spark.data.types.UUID uuid()
+    {
+        return org.apache.cassandra.spark.data.types.UUID.INSTANCE;
+    }
+
+    @Override
+    public VarChar varchar()
+    {
+        return VarChar.INSTANCE;
+    }
+
+    @Override
+    public VarInt varint()
+    {
+        return VarInt.INSTANCE;
+    }
+
+    @Override
+    public CqlField.CqlType collection(String name, CqlField.CqlType... types)
+    {
+        return CqlCollection.build(name, types);
+    }
+
+    @Override
+    public CqlField.CqlList list(CqlField.CqlType type)
+    {
+        return CqlCollection.list(type);
+    }
+
+    @Override
+    public CqlField.CqlSet set(CqlField.CqlType type)
+    {
+        return CqlCollection.set(type);
+    }
+
+    @Override
+    public CqlField.CqlMap map(CqlField.CqlType keyType, CqlField.CqlType 
valueType)
+    {
+        return CqlCollection.map(keyType, valueType);
+    }
+
+    @Override
+    public CqlField.CqlTuple tuple(CqlField.CqlType... types)
+    {
+        return CqlCollection.tuple(types);
+    }
+
+    @Override
+    public CqlField.CqlType frozen(CqlField.CqlType type)
+    {
+        return CqlFrozen.build(type);
+    }
+
+    @Override
+    public CqlField.CqlUdtBuilder udt(String keyspace, String name)
+    {
+        return CqlUdt.builder(keyspace, name);
+    }
+}
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cql3/functions/types/TupleHelper.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/cql3/functions/types/TupleHelper.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cql3/functions/types/TupleHelper.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/cql3/functions/types/TupleHelper.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cql3/functions/types/UserTypeHelper.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/cql3/functions/types/UserTypeHelper.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cql3/functions/types/UserTypeHelper.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/cql3/functions/types/UserTypeHelper.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/NativeType.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/NativeType.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
similarity index 97%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
index 99d8e43..abb2141 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
@@ -31,8 +31,8 @@ import java.util.stream.Collectors;
 
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.spark.data.CassandraTypes;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlType;
 
@@ -193,13 +193,13 @@ public abstract class CqlCollection extends CqlType 
implements CqlField.CqlColle
         return cqlName();
     }
 
-    public static CqlCollection read(CqlField.CqlType.InternalType 
internalType, Input input, CassandraBridge bridge)
+    public static CqlCollection read(CqlField.CqlType.InternalType 
internalType, Input input, CassandraTypes cassandraTypes)
     {
         int numTypes = input.readInt();
         CqlField.CqlType[] types = new CqlField.CqlType[numTypes];
         for (int type = 0; type < numTypes; type++)
         {
-            types[type] = CqlField.CqlType.read(input, bridge);
+            types[type] = CqlField.CqlType.read(input, cassandraTypes);
         }
         return CqlCollection.build(internalType, types);
     }
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
similarity index 94%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
index 27b1b4b..5538ca6 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
@@ -39,7 +39,6 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.cassandra.bridge.BigNumberConfig;
-import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.cql3.functions.types.UDTValue;
@@ -49,6 +48,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.UTF8Serializer;
+import org.apache.cassandra.spark.data.CassandraTypes;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlType;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
@@ -350,24 +350,24 @@ public class CqlUdt extends CqlType implements 
CqlField.CqlUdt
     }
 
     @Override
-    public String createStatement(CassandraBridge bridge, String keyspace)
+    public String createStatement(CassandraTypes cassandraTypes, String 
keyspace)
     {
         return String.format("CREATE TYPE %s.%s (%s);",
-                             bridge.maybeQuoteIdentifier(keyspace),
-                             bridge.maybeQuoteIdentifier(name),
-                             fieldsString(bridge));
+                             cassandraTypes.maybeQuoteIdentifier(keyspace),
+                             cassandraTypes.maybeQuoteIdentifier(name),
+                             fieldsString(cassandraTypes));
     }
 
-    private String fieldsString(CassandraBridge bridge)
+    private String fieldsString(CassandraTypes cassandraTypes)
     {
         return fields.stream()
-                     .map(field -> fieldString(bridge, field))
+                     .map(field -> fieldString(cassandraTypes, field))
                      .collect(Collectors.joining(", "));
     }
 
-    private static String fieldString(CassandraBridge bridge, CqlField field)
+    private static String fieldString(CassandraTypes cassandraTypes, CqlField 
field)
     {
-        return String.format("%s %s", 
bridge.maybeQuoteIdentifier(field.name()), field.type().cqlName());
+        return String.format("%s %s", 
cassandraTypes.maybeQuoteIdentifier(field.name()), field.type().cqlName());
     }
 
     public String keyspace()
@@ -420,13 +420,13 @@ public class CqlUdt extends CqlType implements 
CqlField.CqlUdt
                 .toArray(StructField[]::new));
     }
 
-    public static CqlUdt read(Input input, CassandraBridge bridge)
+    public static CqlUdt read(Input input, CassandraTypes cassandraTypes)
     {
         Builder builder = CqlUdt.builder(input.readString(), 
input.readString());
         int numFields = input.readInt();
         for (int field = 0; field < numFields; field++)
         {
-            builder.withField(input.readString(), CqlField.CqlType.read(input, 
bridge));
+            builder.withField(input.readString(), CqlField.CqlType.read(input, 
cassandraTypes));
         }
         return builder.build();
     }
@@ -482,17 +482,17 @@ public class CqlUdt extends CqlType implements 
CqlField.CqlUdt
 
     public static class Serializer extends 
com.esotericsoftware.kryo.Serializer<CqlUdt>
     {
-        private final CassandraBridge bridge;
+        private final CassandraTypes cassandraTypes;
 
-        public Serializer(CassandraBridge bridge)
+        public Serializer(CassandraTypes cassandraTypes)
         {
-            this.bridge = bridge;
+            this.cassandraTypes = cassandraTypes;
         }
 
         @Override
         public CqlUdt read(Kryo kryo, Input input, Class type)
         {
-            return CqlUdt.read(input, bridge);
+            return CqlUdt.read(input, cassandraTypes);
         }
 
         @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Ascii.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Ascii.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Ascii.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Ascii.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/BigInt.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/BigInt.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/BigInt.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/BigInt.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/BinaryBased.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/BinaryBased.java
similarity index 86%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/BinaryBased.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/BinaryBased.java
index 4e1a35f..865eeeb 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/BinaryBased.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/BinaryBased.java
@@ -19,11 +19,8 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
-import com.google.common.primitives.UnsignedBytes;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
@@ -32,12 +29,11 @@ import org.apache.spark.sql.types.DataTypes;
 
 public abstract class BinaryBased extends NativeType
 {
-    public static final Comparator<byte[]> BYTE_ARRAY_COMPARATOR = 
UnsignedBytes.lexicographicalComparator();
 
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return BYTE_ARRAY_COMPARATOR.compare((byte[]) first, (byte[]) second);
+        return CqlField.BYTE_ARRAY_COMPARATOR.compare((byte[]) first, (byte[]) 
second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Blob.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Blob.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Blob.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Blob.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Boolean.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Boolean.java
similarity index 92%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Boolean.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Boolean.java
index 0bc5e92..062b3e6 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Boolean.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Boolean.java
@@ -19,12 +19,11 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -35,7 +34,6 @@ import org.apache.spark.sql.types.DataTypes;
 public class Boolean extends NativeType
 {
     public static final Boolean INSTANCE = new Boolean();
-    private static final Comparator<java.lang.Boolean> BOOLEAN_COMPARATOR = 
java.lang.Boolean::compareTo;
 
     @Override
     public String name()
@@ -58,7 +56,7 @@ public class Boolean extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return BOOLEAN_COMPARATOR.compare((java.lang.Boolean) first, 
(java.lang.Boolean) second);
+        return CqlField.BOOLEAN_COMPARATOR.compare((java.lang.Boolean) first, 
(java.lang.Boolean) second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Counter.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Counter.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Counter.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Counter.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Date.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
similarity index 96%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Date.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
index f6a3c19..ccc6a9f 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Date.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.cql3.functions.types.LocalDate;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.SimpleDateType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -67,7 +68,7 @@ public class Date extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return Int.INTEGER_COMPARATOR.compare((Integer) first, (Integer) 
second);
+        return CqlField.INTEGER_COMPARATOR.compare((Integer) first, (Integer) 
second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Decimal.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Decimal.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Decimal.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Decimal.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Double.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Double.java
similarity index 92%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Double.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Double.java
index 7c522fd..8f4650b 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Double.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Double.java
@@ -19,12 +19,11 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.DoubleType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -35,7 +34,6 @@ import org.apache.spark.sql.types.DataTypes;
 public class Double extends NativeType
 {
     public static final Double INSTANCE = new Double();
-    private static final Comparator<java.lang.Double> DOUBLE_COMPARATOR = 
java.lang.Double::compareTo;
 
     @Override
     public String name()
@@ -58,7 +56,7 @@ public class Double extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return DOUBLE_COMPARATOR.compare((java.lang.Double) first, 
(java.lang.Double) second);
+        return CqlField.DOUBLE_COMPARATOR.compare((java.lang.Double) first, 
(java.lang.Double) second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Empty.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Empty.java
similarity index 92%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Empty.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Empty.java
index 3b34780..cae82ce 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Empty.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Empty.java
@@ -19,12 +19,11 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.EmptyType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
@@ -34,7 +33,6 @@ import org.apache.spark.sql.types.DataTypes;
 public class Empty extends NativeType
 {
     public static final Empty INSTANCE = new Empty();
-    private static final Comparator<Void> VOID_COMPARATOR_COMPARATOR = (first, 
second) -> 0;
 
     @Override
     public boolean isSupported()
@@ -63,7 +61,7 @@ public class Empty extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return VOID_COMPARATOR_COMPARATOR.compare((Void) first, (Void) second);
+        return CqlField.VOID_COMPARATOR_COMPARATOR.compare((Void) first, 
(Void) second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Float.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Float.java
similarity index 92%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Float.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Float.java
index 99b4a4a..046fb74 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Float.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Float.java
@@ -19,12 +19,11 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -35,7 +34,6 @@ import org.apache.spark.sql.types.DataTypes;
 public class Float extends NativeType
 {
     public static final Float INSTANCE = new Float();
-    private static final Comparator<java.lang.Float> FLOAT_COMPARATOR = 
java.lang.Float::compareTo;
 
     @Override
     public String name()
@@ -58,7 +56,7 @@ public class Float extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return FLOAT_COMPARATOR.compare((java.lang.Float) first, 
(java.lang.Float) second);
+        return CqlField.FLOAT_COMPARATOR.compare((java.lang.Float) first, 
(java.lang.Float) second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Inet.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Inet.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Inet.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Inet.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Int.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Int.java
similarity index 93%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Int.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Int.java
index bd6103e..3239e77 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Int.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Int.java
@@ -19,12 +19,11 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -35,7 +34,6 @@ import org.apache.spark.sql.types.DataTypes;
 public class Int extends NativeType
 {
     public static final Int INSTANCE = new Int();
-    static final Comparator<Integer> INTEGER_COMPARATOR = Integer::compareTo;
 
     @Override
     public String name()
@@ -58,7 +56,7 @@ public class Int extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return INTEGER_COMPARATOR.compare((Integer) first, (Integer) second);
+        return CqlField.INTEGER_COMPARATOR.compare((Integer) first, (Integer) 
second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/LongBased.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/LongBased.java
similarity index 92%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/LongBased.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/LongBased.java
index cf62403..42e295c 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/LongBased.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/LongBased.java
@@ -19,9 +19,8 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -31,7 +30,6 @@ import org.apache.spark.sql.types.DataTypes;
 
 public abstract class LongBased extends NativeType
 {
-    static final Comparator<Long> LONG_COMPARATOR = Long::compareTo;
 
     @Override
     public DataType sparkSqlType(BigNumberConfig bigNumberConfig)
@@ -42,7 +40,7 @@ public abstract class LongBased extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return LONG_COMPARATOR.compare((Long) first, (Long) second);
+        return CqlField.LONG_COMPARATOR.compare((Long) first, (Long) second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/SmallInt.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/SmallInt.java
similarity index 93%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/SmallInt.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/SmallInt.java
index be89d85..9101b5a 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/SmallInt.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/SmallInt.java
@@ -19,12 +19,11 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ShortType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -35,7 +34,6 @@ import org.apache.spark.sql.types.DataTypes;
 public class SmallInt extends NativeType
 {
     public static final SmallInt INSTANCE = new SmallInt();
-    private static final Comparator<Short> SHORT_COMPARATOR = Short::compare;
 
     @Override
     public String name()
@@ -58,7 +56,7 @@ public class SmallInt extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return SHORT_COMPARATOR.compare((Short) first, (Short) second);
+        return CqlField.SHORT_COMPARATOR.compare((Short) first, (Short) 
second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/StringBased.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/StringBased.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/StringBased.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/StringBased.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Text.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Text.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Text.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Text.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Time.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Time.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Time.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Time.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/TimeUUID.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/TimeUUID.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/TimeUUID.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/TimeUUID.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Timestamp.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Timestamp.java
similarity index 95%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Timestamp.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Timestamp.java
index 51dcd6a..c232400 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/Timestamp.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Timestamp.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TimestampType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
@@ -60,7 +61,7 @@ public class Timestamp extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return BigInt.LONG_COMPARATOR.compare((Long) first, (Long) second);
+        return CqlField.LONG_COMPARATOR.compare((Long) first, (Long) second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/TinyInt.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/TinyInt.java
similarity index 88%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/TinyInt.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/TinyInt.java
index b00cf72..fe00e54 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/TinyInt.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/TinyInt.java
@@ -19,12 +19,11 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.NativeType;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.Row;
@@ -35,12 +34,6 @@ import org.apache.spark.sql.types.DataTypes;
 public class TinyInt extends NativeType
 {
     public static final TinyInt INSTANCE = new TinyInt();
-    private static final Comparator<Byte> BYTE_COMPARATOR = 
TinyInt::compareBytes;
-
-    private static int compareBytes(byte first, byte second)
-    {
-        return first - second;  // Safe because of the range being restricted
-    }
 
     @Override
     public String name()
@@ -63,7 +56,7 @@ public class TinyInt extends NativeType
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return BYTE_COMPARATOR.compare((Byte) first, (Byte) second);
+        return CqlField.BYTE_COMPARATOR.compare((Byte) first, (Byte) second);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/UUID.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/UUID.java
similarity index 90%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/UUID.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/UUID.java
index 54f7161..967db99 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/UUID.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/UUID.java
@@ -19,17 +19,15 @@
 
 package org.apache.cassandra.spark.data.types;
 
-import java.util.Comparator;
-
 import org.apache.cassandra.cql3.functions.types.DataType;
 import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.spark.data.CqlField;
 
 public class UUID extends StringBased
 {
     public static final UUID INSTANCE = new UUID();
-    private static final Comparator<String> UUID_COMPARATOR = 
Comparator.comparing(java.util.UUID::fromString);
 
     @Override
     public String name()
@@ -46,7 +44,7 @@ public class UUID extends StringBased
     @Override
     protected int compareTo(Object first, Object second)
     {
-        return UUID_COMPARATOR.compare(first.toString(), second.toString());
+        return CqlField.UUID_COMPARATOR.compare(first.toString(), 
second.toString());
     }
 
     @Override
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/VarChar.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/VarChar.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/VarChar.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/VarChar.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/VarInt.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/VarInt.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/types/VarInt.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/VarInt.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ComplexTypeBuffer.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/ComplexTypeBuffer.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ComplexTypeBuffer.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/ComplexTypeBuffer.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ListBuffer.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/ListBuffer.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ListBuffer.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/ListBuffer.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/MapBuffer.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/MapBuffer.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/MapBuffer.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/MapBuffer.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SetBuffer.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SetBuffer.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SetBuffer.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SetBuffer.java
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/UdtBuffer.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/UdtBuffer.java
similarity index 100%
rename from 
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/UdtBuffer.java
rename to 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/UdtBuffer.java
diff --git a/settings.gradle b/settings.gradle
index 1651405..4ba2635 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,6 +23,7 @@ include 'cassandra-analytics-common'
 include 'cassandra-bridge'
 include 'cassandra-four-zero'
 include 'cassandra-four-zero-bridge'
+include 'cassandra-four-zero-types'
 include 'cassandra-analytics-core'
 include 'cassandra-analytics-core-example'
 include 'cassandra-analytics-integration-framework'


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to