This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc0e79b  CASSANDRA-19369 Use XXHash32 for digest calculation of 
SSTables
dc0e79b is described below

commit dc0e79b9c483562ec0920d69e886715eb329c426
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Wed Jan 31 13:44:23 2024 -0800

    CASSANDRA-19369 Use XXHash32 for digest calculation of SSTables
    
    This commit adds the ability to use the newly supported in Cassandra 
Sidecar XXhash32 digest algorithm.
    The commit allows for backwards compatibility to perform MD5 checksumming, 
but it now defaults to XXHash32.
    
    A new Writer option is added:
    
    ```
    .option(WriterOptions.DIGEST.name(), "XXHASH32") // or
    .option(WriterOptions.DIGEST.name(), "MD5")
    ```
    
    This option defaults to XXHash32, when not provided, but it can be 
configured to use the legacy MD5 algorithm.
    
    Path by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19369
---
 CHANGES.txt                                        |   1 +
 cassandra-analytics-core/build.gradle              |   1 +
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  16 ++-
 .../spark/bulkwriter/CassandraJobInfo.java         |   7 ++
 .../spark/bulkwriter/DataTransferApi.java          |   4 +-
 .../spark/bulkwriter/DigestAlgorithmSupplier.java} |  19 +---
 .../spark/bulkwriter/DigestAlgorithms.java}        |  39 +++++--
 .../apache/cassandra/spark/bulkwriter/JobInfo.java |   6 +
 .../cassandra/spark/bulkwriter/RecordWriter.java   |  36 ++++--
 .../cassandra/spark/bulkwriter/SSTableWriter.java  |  42 ++++---
 .../spark/bulkwriter/SidecarDataTransferApi.java   |   7 +-
 .../cassandra/spark/bulkwriter/StreamSession.java  |  19 ++--
 .../cassandra/spark/bulkwriter/WriterOptions.java  |   5 +
 .../spark/common/{MD5Hash.java => Digest.java}     |  31 ++----
 .../spark/common/{MD5Hash.java => MD5Digest.java}  |  44 ++++++--
 .../cassandra/spark/common/XXHash32Digest.java     |  79 +++++++++++++
 .../cassandra/spark/utils/DigestAlgorithm.java}    |  28 ++---
 .../cassandra/spark/utils/MD5DigestAlgorithm.java  |  55 ++++++++++
 .../spark/utils/XXHash32DigestAlgorithm.java       |  67 +++++++++++
 .../spark/bulkwriter/MockBulkWriterContext.java    |  14 ++-
 .../bulkwriter/NonValidatingTestSSTableWriter.java |   5 +-
 .../spark/bulkwriter/RecordWriterTest.java         |  24 ++--
 .../spark/bulkwriter/SSTableWriterTest.java        |   8 +-
 .../bulkwriter/StreamSessionConsistencyTest.java   |  10 +-
 .../spark/bulkwriter/StreamSessionTest.java        |  18 +--
 .../cassandra/spark/bulkwriter/UploadRequest.java  |  14 +--
 .../spark/utils/MD5DigestAlgorithmTest.java        |  68 ++++++++++++
 .../cassandra/spark/utils/ResourceUtils.java       |  79 +++++++++++++
 ...pTests.java => SSTableInputStreamHttpTest.java} |  10 +-
 .../spark/utils/XXHash32DigestAlgorithmTest.java   |  71 ++++++++++++
 .../src/test/resources/digest/file1.txt            |   1 +
 .../src/test/resources/digest/file2.txt            |   1 +
 .../src/test/resources/digest/file3.txt            |   1 +
 .../distributed/impl/AbstractClusterUtils.java     |   2 +-
 .../testing/SharedClusterIntegrationTestBase.java  |  24 ++++
 .../analytics/QuoteIdentifiersWriteTest.java       |  91 ++-------------
 .../apache/cassandra/analytics/SparkTestUtils.java |  34 ++++++
 .../analytics/WriterDigestIntegrationTest.java     | 122 +++++++++++++++++++++
 .../org/apache/cassandra/spark/utils/MapUtils.java |   2 +-
 scripts/build-sidecar.sh                           |   2 +-
 40 files changed, 872 insertions(+), 235 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e0ff584..9daa5f0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Use XXHash32 for digest calculation of SSTables (CASSANDRA-19369)
  * Startup Validation Failures when Checking Sidecar Connectivity 
(CASSANDRA-19377)
  * No longer need to synchronize on Schema.instance after Cassandra 4.0.12 
(CASSANDRA-19351)
  * Upgrade to Cassandra 4.0.12 and remove RowBufferMode and BatchSize options 
(CASSANDRA-19334)
diff --git a/cassandra-analytics-core/build.gradle 
b/cassandra-analytics-core/build.gradle
index 9228741..e015a27 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -58,6 +58,7 @@ project(':cassandra-analytics-core') {
 
         // This dependency must be built by running 
`scripts/build-dependencies.sh`
         implementation(group: "${sidecarClientGroup}", name: 
"${sidecarClientName}", version: "${sidecarVersion}")
+        implementation(group: 'org.lz4', name: 'lz4-java', version: '1.8.0')
 
         if ("${scalaMajorVersion}" == "2.11") {
             implementation(group: 'org.scala-lang.modules', name: 
"scala-java8-compat_2.11", version: '1.0.1', transitive: false)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 022c1db..f079558 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -130,6 +130,7 @@ public class BulkSparkConf implements Serializable
     protected boolean useOpenSsl;
     protected int ringRetryCount;
     protected final Set<String> blockedInstances;
+    protected final DigestAlgorithmSupplier digestAlgorithmSupplier;
 
     public BulkSparkConf(SparkConf conf, Map<String, String> options)
     {
@@ -157,6 +158,7 @@ public class BulkSparkConf implements Serializable
         this.truststoreBase64Encoded = MapUtils.getOrDefault(options, 
WriterOptions.TRUSTSTORE_BASE64_ENCODED.name(), null);
         this.truststoreType = MapUtils.getOrDefault(options, 
WriterOptions.TRUSTSTORE_TYPE.name(), null);
         this.writeMode = MapUtils.getEnumOption(options, 
WriterOptions.WRITE_MODE.name(), WriteMode.INSERT, "write mode");
+        this.digestAlgorithmSupplier = 
digestAlgorithmSupplierFromOptions(options);
         // For backwards-compatibility with port settings, use writer option 
if available,
         // else fall back to props, and then default if neither specified
         this.useOpenSsl = getBoolean(USE_OPENSSL, true);
@@ -168,7 +170,19 @@ public class BulkSparkConf implements Serializable
         validateEnvironment();
     }
 
-    private Set<String> buildBlockedInstances(Map<String, String> options)
+    /**
+     * Returns the supplier for the digest algorithm from the configured 
{@code options}.
+     *
+     * @param options a key-value map with options for the bulk write job
+     * @return the configured {@link DigestAlgorithmSupplier}
+     */
+    @NotNull
+    protected DigestAlgorithmSupplier 
digestAlgorithmSupplierFromOptions(Map<String, String> options)
+    {
+        return MapUtils.getEnumOption(options, WriterOptions.DIGEST.name(), 
DigestAlgorithms.XXHASH32, "digest type");
+    }
+
+    protected Set<String> buildBlockedInstances(Map<String, String> options)
     {
         String blockedInstances = MapUtils.getOrDefault(options, 
WriterOptions.BLOCKED_CASSANDRA_INSTANCES.name(), "");
         return Arrays.stream(blockedInstances.split(","))
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index b385289..f91b495 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -109,4 +109,11 @@ public class CassandraJobInfo implements JobInfo
     {
         return conf.table;
     }
+
+    @NotNull
+    @Override
+    public DigestAlgorithmSupplier digestAlgorithmSupplier()
+    {
+        return conf.digestAlgorithmSupplier;
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DataTransferApi.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DataTransferApi.java
index c0786a2..f36ed73 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DataTransferApi.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DataTransferApi.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import java.nio.file.Path;
 import java.util.List;
 
-import org.apache.cassandra.spark.common.MD5Hash;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.common.client.ClientException;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.jetbrains.annotations.Nullable;
@@ -59,5 +59,5 @@ public interface DataTransferApi extends Serializable
                                 int ssTableIdx,
                                 CassandraInstance instance,
                                 String sessionID,
-                                MD5Hash fileHash) throws ClientException;
+                                Digest digest) throws ClientException;
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithmSupplier.java
similarity index 67%
copy from 
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
copy to 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithmSupplier.java
index 1be5c2c..89c4772 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithmSupplier.java
@@ -19,20 +19,13 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.nio.file.Path;
+import java.util.function.Supplier;
 
-import org.jetbrains.annotations.NotNull;
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
 
-class NonValidatingTestSSTableWriter extends SSTableWriter
+/**
+ * An interface that defines a {@link DigestAlgorithm} for a concrete digest 
type
+ */
+public interface DigestAlgorithmSupplier extends Supplier<DigestAlgorithm>
 {
-    NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path)
-    {
-        super(tableWriter, path);
-    }
-
-    @Override
-    public void validateSSTables(@NotNull BulkWriterContext writerContext, int 
partitionId)
-    {
-        // Skip validation for these tests
-    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithms.java
similarity index 50%
copy from 
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
copy to 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithms.java
index 1be5c2c..3a70e89 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithms.java
@@ -19,20 +19,37 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.nio.file.Path;
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
+import org.apache.cassandra.spark.utils.MD5DigestAlgorithm;
+import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 
-import org.jetbrains.annotations.NotNull;
-
-class NonValidatingTestSSTableWriter extends SSTableWriter
+/**
+ * Represents the user-provided digest type configuration to be used to 
validate SSTable files during bulk writes
+ */
+public enum DigestAlgorithms implements DigestAlgorithmSupplier
 {
-    NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path)
+    /**
+     * Represents an MD5 digest type option. This option is supported for 
legacy reasons, but its use
+     * is strongly discouraged.
+     */
+    MD5
     {
-        super(tableWriter, path);
-    }
+        @Override
+        public DigestAlgorithm get()
+        {
+            return new MD5DigestAlgorithm();
+        }
+    },
 
-    @Override
-    public void validateSSTables(@NotNull BulkWriterContext writerContext, int 
partitionId)
+    /**
+     * Represents an xxhash32 digest type option
+     */
+    XXHASH32
     {
-        // Skip validation for these tests
-    }
+        @Override
+        public DigestAlgorithm get()
+        {
+            return new XXHash32DigestAlgorithm();
+        }
+    };
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index 91da29a..10635b1 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -60,4 +60,10 @@ public interface JobInfo extends Serializable
     }
 
     boolean getSkipClean();
+
+    /**
+     * @return the digest type provider for the bulk job, and used to 
calculate digests for SSTable components
+     */
+    @NotNull
+    DigestAlgorithmSupplier digestAlgorithmSupplier();
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 908ecba..5ea1205 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -38,7 +38,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.function.BiFunction;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -52,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
 import org.apache.spark.InterruptibleIterator;
 import org.apache.spark.TaskContext;
 import scala.Tuple2;
@@ -62,14 +62,16 @@ import static 
org.apache.cassandra.spark.utils.ScalaConversionUtils.asScalaItera
 public class RecordWriter implements Serializable
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RecordWriter.class);
-
+    private static final long serialVersionUID = 3746578054834640428L;
     private final BulkWriterContext writerContext;
     private final String[] columnNames;
-    private final BiFunction<BulkWriterContext, Path, SSTableWriter> 
tableWriterSupplier;
+    private final SSTableWriterFactory tableWriterFactory;
+    private final DigestAlgorithm digestAlgorithm;
+
     private final BulkWriteValidator writeValidator;
     private final ReplicaAwareFailureHandler<RingInstance> failureHandler;
 
-    private Supplier<TaskContext> taskContextSupplier;
+    private final Supplier<TaskContext> taskContextSupplier;
     private SSTableWriter sstableWriter = null;
     private int outputSequence = 0; // sub-folder for possible subrange splits
 
@@ -82,14 +84,15 @@ public class RecordWriter implements Serializable
     RecordWriter(BulkWriterContext writerContext,
                  String[] columnNames,
                  Supplier<TaskContext> taskContextSupplier,
-                 BiFunction<BulkWriterContext, Path, SSTableWriter> 
tableWriterSupplier)
+                 SSTableWriterFactory tableWriterFactory)
     {
         this.writerContext = writerContext;
         this.columnNames = columnNames;
         this.taskContextSupplier = taskContextSupplier;
-        this.tableWriterSupplier = tableWriterSupplier;
+        this.tableWriterFactory = tableWriterFactory;
         this.failureHandler = new 
ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
         this.writeValidator = new BulkWriteValidator(writerContext, 
failureHandler);
+        this.digestAlgorithm = 
this.writerContext.job().digestAlgorithmSupplier().get();
 
         writerContext.cluster().startupValidate();
     }
@@ -360,7 +363,7 @@ public class RecordWriter implements Serializable
             Path outDir = Paths.get(baseDir.toString(), 
Integer.toString(outputSequence++));
             Files.createDirectories(outDir);
 
-            sstableWriter = tableWriterSupplier.apply(writerContext, outDir);
+            sstableWriter = tableWriterFactory.create(writerContext, outDir, 
digestAlgorithm);
             LOGGER.info("[{}] Created new SSTable writer", partitionId);
         }
     }
@@ -400,4 +403,23 @@ public class RecordWriter implements Serializable
         LOGGER.info("[{}] Creating stream session for range={}", 
taskContext.partitionId(), tokenRange);
         return new StreamSession(writerContext, getStreamId(taskContext), 
tokenRange, failureHandler);
     }
+
+    /**
+     * Functional interface that helps with creating {@link SSTableWriter} 
instances.
+     */
+    public interface SSTableWriterFactory
+    {
+        /**
+         * Creates a new instance of the {@link SSTableWriter} with the 
provided {@code writerContext},
+         * {@code outDir}, and {@code digestProvider} parameters.
+         *
+         * @param writerContext  the context for the bulk writer job
+         * @param outDir         an output directory where SSTables components 
will be written to
+         * @param digestAlgorithm a digest provider to calculate digests for 
every SSTable component
+         * @return a new {@link SSTableWriter}
+         */
+        SSTableWriter create(BulkWriterContext writerContext,
+                             Path outDir,
+                             DigestAlgorithm digestAlgorithm);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
index 18021f2..acffca0 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
@@ -20,31 +20,29 @@
 package org.apache.cassandra.spark.bulkwriter;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.math.BigInteger;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.security.MessageDigest;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Range;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.bridge.CassandraVersionFeatures;
-import org.apache.cassandra.spark.common.MD5Hash;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.common.SSTables;
 import org.apache.cassandra.spark.data.DataLayer;
 import org.apache.cassandra.spark.data.LocalDataLayer;
 import org.apache.cassandra.spark.reader.Rid;
 import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
 import org.jetbrains.annotations.NotNull;
 
 @SuppressWarnings("WeakerAccess")
@@ -58,17 +56,21 @@ public class SSTableWriter
     private final org.apache.cassandra.bridge.SSTableWriter cqlSSTableWriter;
     private BigInteger minToken = null;
     private BigInteger maxToken = null;
-    private final Map<Path, MD5Hash> fileHashes = new HashMap<>();
+    private final Map<Path, Digest> fileDigestMap = new HashMap<>();
+    private final DigestAlgorithm digestAlgorithm;
 
-    public SSTableWriter(org.apache.cassandra.bridge.SSTableWriter 
tableWriter, Path outDir)
+    public SSTableWriter(org.apache.cassandra.bridge.SSTableWriter 
tableWriter, Path outDir,
+                         DigestAlgorithm digestAlgorithm)
     {
         cqlSSTableWriter = tableWriter;
         this.outDir = outDir;
+        this.digestAlgorithm = digestAlgorithm;
     }
 
-    public SSTableWriter(BulkWriterContext writerContext, Path outDir)
+    public SSTableWriter(BulkWriterContext writerContext, Path outDir, 
DigestAlgorithm digestAlgorithm)
     {
         this.outDir = outDir;
+        this.digestAlgorithm = digestAlgorithm;
 
         String lowestCassandraVersion = 
writerContext.cluster().getLowestCassandraVersion();
         String packageVersion = getPackageVersion(lowestCassandraVersion);
@@ -108,7 +110,7 @@ public class SSTableWriter
             // NOTE: We calculate file hashes before re-reading so that we 
know what we hashed
             //       is what we validated. Then we send these along with the 
files and the
             //       receiving end re-hashes the files to make sure they still 
match.
-            fileHashes.putAll(calculateFileHashes(dataFile));
+            fileDigestMap.putAll(calculateFileDigestMap(dataFile));
         }
         validateSSTables(writerContext, partitionId);
     }
@@ -146,29 +148,22 @@ public class SSTableWriter
         return Files.newDirectoryStream(getOutDir(), "*Data.db");
     }
 
-    private Map<Path, MD5Hash> calculateFileHashes(Path dataFile) throws 
IOException
+    private Map<Path, Digest> calculateFileDigestMap(Path dataFile) throws 
IOException
     {
-        Map<Path, MD5Hash> fileHashes = new HashMap<>();
+        Map<Path, Digest> fileHashes = new HashMap<>();
         try (DirectoryStream<Path> filesToHash =
              Files.newDirectoryStream(dataFile.getParent(), 
SSTables.getSSTableBaseName(dataFile) + "*"))
         {
             for (Path path : filesToHash)
             {
-                fileHashes.put(path, calculateFileHash(path));
+                Digest digest = digestAlgorithm.calculateFileDigest(path);
+                fileHashes.put(path, digest);
+                LOGGER.debug("Calculated digest={} for path={}", digest, path);
             }
         }
         return fileHashes;
     }
 
-    private MD5Hash calculateFileHash(Path path) throws IOException
-    {
-        try (InputStream is = Files.newInputStream(path))
-        {
-            MessageDigest computedMd5 = 
DigestUtils.updateDigest(DigestUtils.getMd5Digest(), is);
-            return MD5Hash.fromDigest(computedMd5);
-        }
-    }
-
     public Range<BigInteger> getTokenRange()
     {
         return Range.closed(minToken, maxToken);
@@ -179,8 +174,11 @@ public class SSTableWriter
         return outDir;
     }
 
-    public Map<Path, MD5Hash> getFileHashes()
+    /**
+     * @return a view of the file digest map
+     */
+    public Map<Path, Digest> fileDigestMap()
     {
-        return fileHashes;
+        return Collections.unmodifiableMap(fileDigestMap);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java
index b6a492b..46f8f11 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java
@@ -31,9 +31,8 @@ import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.sidecar.client.SidecarClient;
 import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
 import org.apache.cassandra.sidecar.client.request.ImportSSTableRequest;
-import org.apache.cassandra.sidecar.common.data.MD5Digest;
 import org.apache.cassandra.sidecar.common.data.SSTableImportResponse;
-import org.apache.cassandra.spark.common.MD5Hash;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.common.client.ClientException;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 
@@ -69,7 +68,7 @@ public class SidecarDataTransferApi implements DataTransferApi
                                        int ssTableIdx,
                                        CassandraInstance instance,
                                        String sessionID,
-                                       MD5Hash fileHash) throws ClientException
+                                       Digest digest) throws ClientException
     {
         String componentName = updateComponentName(componentFile, ssTableIdx);
         String uploadId = getUploadId(sessionID, job.getId().toString());
@@ -80,7 +79,7 @@ public class SidecarDataTransferApi implements DataTransferApi
                                                maybeQuotedIdentifier(bridge, 
conf.quoteIdentifiers, conf.table),
                                                uploadId,
                                                componentName,
-                                               new 
MD5Digest(fileHash.toString()),
+                                               digest.toSidecarDigest(),
                                                
componentFile.toAbsolutePath().toString())
                          .get();
         }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
index 044f1a8..4035c11 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
@@ -48,12 +48,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
-import org.apache.cassandra.spark.common.MD5Hash;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.common.SSTables;
 
 public class StreamSession
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamSession.class);
+    private static final String WRITE_PHASE = "UploadAndCommit";
+
     private final BulkWriterContext writerContext;
     private final String sessionID;
     private final Range<BigInteger> tokenRange;
@@ -64,7 +66,6 @@ public class StreamSession
     private final ExecutorService executor;
     private final List<Future<?>> futures = new ArrayList<>();
     private final TokenRangeMapping<RingInstance> tokenRangeMapping;
-    private static final String WRITE_PHASE = "UploadAndCommit";
 
     public StreamSession(final BulkWriterContext writerContext,
                          final String sessionID,
@@ -244,7 +245,7 @@ public class StreamSession
     {
         try
         {
-            sendSSTableToReplica(writerContext, dataFile, ssTableIdx, replica, 
ssTableWriter.getFileHashes());
+            sendSSTableToReplica(writerContext, dataFile, ssTableIdx, replica, 
ssTableWriter.fileDigestMap());
             return true;
         }
         catch (Exception exception)
@@ -273,7 +274,7 @@ public class StreamSession
                                       Path dataFile,
                                       int ssTableIdx,
                                       RingInstance instance,
-                                      Map<Path, MD5Hash> fileHashes) throws 
Exception
+                                      Map<Path, Digest> fileDigestMap) throws 
Exception
     {
         try (DirectoryStream<Path> componentFileStream = 
Files.newDirectoryStream(dataFile.getParent(), 
SSTables.getSSTableBaseName(dataFile) + "*"))
         {
@@ -283,9 +284,9 @@ public class StreamSession
                 {
                     continue;
                 }
-                sendSSTableComponent(writerContext, componentFile, ssTableIdx, 
instance, fileHashes.get(componentFile));
+                sendSSTableComponent(writerContext, componentFile, ssTableIdx, 
instance, fileDigestMap.get(componentFile));
             }
-            sendSSTableComponent(writerContext, dataFile, ssTableIdx, 
instance, fileHashes.get(dataFile));
+            sendSSTableComponent(writerContext, dataFile, ssTableIdx, 
instance, fileDigestMap.get(dataFile));
         }
     }
 
@@ -293,12 +294,12 @@ public class StreamSession
                                       Path componentFile,
                                       int ssTableIdx,
                                       RingInstance instance,
-                                      MD5Hash fileHash) throws Exception
+                                      Digest digest) throws Exception
     {
-        Preconditions.checkNotNull(fileHash, "All files must have a hash. 
SSTableWriter should have calculated these. This is a bug.");
+        Preconditions.checkNotNull(digest, "All files must have a hash. 
SSTableWriter should have calculated these. This is a bug.");
         long fileSize = Files.size(componentFile);
         LOGGER.info("[{}]: Uploading {} to {}: Size is {}", sessionID, 
componentFile, instance.nodeName(), fileSize);
-        writerContext.transfer().uploadSSTableComponent(componentFile, 
ssTableIdx, instance, sessionID, fileHash);
+        writerContext.transfer().uploadSSTableComponent(componentFile, 
ssTableIdx, instance, sessionID, digest);
     }
 
     public static void clean(BulkWriterContext writerContext, RingInstance 
instance, String sessionID)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index 3dd68dd..fd04d97 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -56,4 +56,9 @@ public enum WriterOptions implements WriterOption
      * the write consistency level validations.
      */
     BLOCKED_CASSANDRA_INSTANCES,
+    /**
+     * Option that specifies the type of digest to compute when uploading 
SSTables for checksum validation.
+     * If unspecified, it defaults to {@code XXHash32} digests. The legacy 
{@code MD5} digest is also supported.
+     */
+    DIGEST,
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Digest.java
similarity index 66%
copy from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java
copy to 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Digest.java
index df8c29e..07d7c22 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Digest.java
@@ -19,25 +19,18 @@
 
 package org.apache.cassandra.spark.common;
 
-import java.security.MessageDigest;
-import java.util.Base64;
-
-public final class MD5Hash
+/**
+ * Interface that represents the computed digest
+ */
+public interface Digest
 {
-    private final String value;
-
-    private MD5Hash(MessageDigest digest)
-    {
-        value = Base64.getEncoder().encodeToString(digest.digest());
-    }
-
-    public static MD5Hash fromDigest(MessageDigest messageDigest)
-    {
-        return new MD5Hash(messageDigest);
-    }
+    /**
+     * @return the string representation of the digest
+     */
+    String value();
 
-    public String toString()
-    {
-        return value;
-    }
+    /**
+     * @return the digest translated to Sidecar digest
+     */
+    org.apache.cassandra.sidecar.common.data.Digest toSidecarDigest();
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Digest.java
similarity index 52%
rename from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java
rename to 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Digest.java
index df8c29e..3c62244 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Digest.java
@@ -19,25 +19,51 @@
 
 package org.apache.cassandra.spark.common;
 
-import java.security.MessageDigest;
-import java.util.Base64;
 
-public final class MD5Hash
+import java.util.Objects;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * An implementation of {@link Digest} that represents an MD5 digest
+ */
+public class MD5Digest implements Digest
 {
-    private final String value;
+    private final @NotNull String value;
+
+    /**
+     * Constructs a new MD5Digest with the provided MD5 {@code value}
+     *
+     * @param value the MD5 value
+     */
+    public MD5Digest(@NotNull String value)
+    {
+        this.value = Objects.requireNonNull(value, "value is required");
+    }
 
-    private MD5Hash(MessageDigest digest)
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String value()
     {
-        value = Base64.getEncoder().encodeToString(digest.digest());
+        return value;
     }
 
-    public static MD5Hash fromDigest(MessageDigest messageDigest)
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public org.apache.cassandra.sidecar.common.data.Digest toSidecarDigest()
     {
-        return new MD5Hash(messageDigest);
+        return new org.apache.cassandra.sidecar.common.data.MD5Digest(value);
     }
 
+    @Override
     public String toString()
     {
-        return value;
+        return "MD5Digest{" +
+               "value='" + value + '\'' +
+               '}';
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/XXHash32Digest.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/XXHash32Digest.java
new file mode 100644
index 0000000..1ad8547
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/XXHash32Digest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;
+
+import java.util.Objects;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An implementation of {@link Digest} that represents an XXHash32 digest
+ */
+public class XXHash32Digest implements Digest
+{
+    private final @NotNull String value;
+    private final @NotNull String seedHex;
+
+    /**
+     * Constructs a new XXHashDigest with the provided XXHash {@code value} 
and the seed value represented as
+     * a hexadecimal string
+     *
+     * @param value the xxhash value
+     * @param seed  the value of the seed used to calculate the digest
+     */
+    public XXHash32Digest(@NotNull String value, int seed)
+    {
+        this.value = Objects.requireNonNull(value, "value is required");
+        this.seedHex = Integer.toHexString(seed);
+    }
+
+    /**
+     * @return the string representation of the digest
+     */
+    @Override
+    public String value()
+    {
+        return value;
+    }
+
+    /**
+     * @return the optional seed in hexadecimal format
+     */
+    public @Nullable String seedHex()
+    {
+        return seedHex;
+    }
+
+    @Override
+    public org.apache.cassandra.sidecar.common.data.Digest toSidecarDigest()
+    {
+        return new 
org.apache.cassandra.sidecar.common.data.XXHash32Digest(value, seedHex);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "XXHash32Digest{" +
+               "value='" + value + '\'' +
+               ", seedHex='" + seedHex + '\'' +
+               '}';
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/DigestAlgorithm.java
similarity index 59%
copy from 
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
copy to 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/DigestAlgorithm.java
index 1be5c2c..dac1254 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/DigestAlgorithm.java
@@ -17,22 +17,24 @@
  * under the License.
  */
 
-package org.apache.cassandra.spark.bulkwriter;
+package org.apache.cassandra.spark.utils;
 
+import java.io.IOException;
 import java.nio.file.Path;
 
-import org.jetbrains.annotations.NotNull;
+import org.apache.cassandra.spark.common.Digest;
 
-class NonValidatingTestSSTableWriter extends SSTableWriter
+/**
+ * Interface that computes a {@link Digest}
+ */
+public interface DigestAlgorithm
 {
-    NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path)
-    {
-        super(tableWriter, path);
-    }
-
-    @Override
-    public void validateSSTables(@NotNull BulkWriterContext writerContext, int 
partitionId)
-    {
-        // Skip validation for these tests
-    }
+    /**
+     * Calculates the {@link Digest} for the given file in the {@code path}.
+     *
+     * @param path the path of the file
+     * @return the calculated digest for the given {@code path}
+     * @throws IOException when an error occurs while reading the file or 
calculating the digest
+     */
+    Digest calculateFileDigest(Path path) throws IOException;
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithm.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithm.java
new file mode 100644
index 0000000..b7665da
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithm.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.util.Base64;
+
+import org.apache.commons.codec.digest.DigestUtils;
+
+import org.apache.cassandra.spark.common.Digest;
+import org.apache.cassandra.spark.common.MD5Digest;
+
+/**
+ * A {@link DigestAlgorithm} implementation that computes MD5 digests
+ */
+public class MD5DigestAlgorithm implements DigestAlgorithm
+{
+    /**
+     * Calculates the {@link Digest} for the given file in the {@code path}.
+     *
+     * @param path the path of the file
+     * @return the calculated digest for the given {@code path}
+     * @throws IOException when an error occurs while reading the file or 
calculating the digest
+     */
+    @Override
+    public Digest calculateFileDigest(Path path) throws IOException
+    {
+        try (InputStream is = Files.newInputStream(path))
+        {
+            MessageDigest computedMd5 = 
DigestUtils.updateDigest(DigestUtils.getMd5Digest(), is);
+            return new 
MD5Digest(Base64.getEncoder().encodeToString(computedMd5.digest()));
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithm.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithm.java
new file mode 100644
index 0000000..724125b
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithm.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import net.jpountz.xxhash.StreamingXXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.spark.common.Digest;
+import org.apache.cassandra.spark.common.XXHash32Digest;
+
+/**
+ * A {@link DigestAlgorithm} implementation that computes XXHash32 digests
+ */
+public class XXHash32DigestAlgorithm implements DigestAlgorithm
+{
+    private static final int KIB_512 = 512 * 1024;
+    /**
+     * A seed used to calculate the XXHash32 digest
+     */
+    private static final int SEED = 0;
+
+    /**
+     * Calculates the {@link org.apache.cassandra.spark.common.XXHash32Digest} 
for the given file in the {@code path}.
+     *
+     * @param path the path of the file
+     * @return the calculated digest for the given {@code path}
+     * @throws IOException when an error occurs while reading the file or 
calculating the digest
+     */
+    @Override
+    public Digest calculateFileDigest(Path path) throws IOException
+    {
+        // might have shared hashers with ThreadLocal
+        XXHashFactory factory = XXHashFactory.safeInstance();
+        try (InputStream inputStream = Files.newInputStream(path);
+             StreamingXXHash32 hasher = factory.newStreamingHash32(SEED))
+        {
+            int len;
+            byte[] buffer = new byte[KIB_512];
+            while ((len = inputStream.read(buffer)) != -1)
+            {
+                hasher.update(buffer, 0, len);
+            }
+            return new XXHash32Digest(Long.toHexString(hasher.getValue()), 
SEED);
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index e864dac..e059170 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
-import org.apache.cassandra.spark.common.MD5Hash;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.common.client.ClientException;
 import org.apache.cassandra.spark.common.client.InstanceState;
 import org.apache.cassandra.spark.common.model.BulkFeatures;
@@ -55,6 +55,7 @@ import 
org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.validation.StartupValidator;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.NotNull;
 
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
@@ -245,6 +246,13 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         return skipClean;
     }
 
+    @NotNull
+    @Override
+    public DigestAlgorithmSupplier digestAlgorithmSupplier()
+    {
+        return DigestAlgorithms.XXHASH32;
+    }
+
     public void setSkipCleanOnFailures(boolean skipClean)
     {
         this.skipClean = skipClean;
@@ -326,7 +334,7 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
                                        int ssTableIdx,
                                        CassandraInstance instance,
                                        String sessionID,
-                                       MD5Hash fileHash) throws ClientException
+                                       Digest digest) throws ClientException
     {
         boolean uploadSucceeded = uploadRequestConsumer.test(instance);
         uploads.compute(instance, (k, pathList) -> {
@@ -334,7 +342,7 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
             {
                 pathList = new ArrayList<>();
             }
-            pathList.add(new UploadRequest(componentFile, ssTableIdx, 
instance, sessionID, fileHash, uploadSucceeded));
+            pathList.add(new UploadRequest(componentFile, ssTableIdx, 
instance, sessionID, digest, uploadSucceeded));
             return pathList;
         });
         if (!uploadSucceeded)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
index 1be5c2c..08ae58a 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
@@ -21,13 +21,14 @@ package org.apache.cassandra.spark.bulkwriter;
 
 import java.nio.file.Path;
 
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
 import org.jetbrains.annotations.NotNull;
 
 class NonValidatingTestSSTableWriter extends SSTableWriter
 {
-    NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path)
+    NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path, 
DigestAlgorithm digestAlgorithm)
     {
-        super(tableWriter, path);
+        super(tableWriter, path, digestAlgorithm);
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index 92fda4c..05f7b74 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -47,6 +47,8 @@ import 
org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
+import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
@@ -84,7 +86,7 @@ public class RecordWriterTest
     };
 
     @TempDir
-    public Path folder; // CHECKSTYLE IGNORE: Public mutable field for 
parameterized testing
+    private Path folder;
 
     private TokenRangeMapping<RingInstance> tokenRangeMapping;
     private RecordWriter rw;
@@ -93,10 +95,12 @@ public class RecordWriterTest
     private Range<BigInteger> range;
     private MockBulkWriterContext writerContext;
     private TestTaskContext tc;
+    private DigestAlgorithm digestAlgorithm;
 
     @BeforeEach
     public void setUp()
     {
+        digestAlgorithm = new XXHash32DigestAlgorithm();
         tw = new MockTableWriter(folder.getRoot());
         tokenRangeMapping = 
TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 
3), 12);
         writerContext = new MockBulkWriterContext(tokenRangeMapping);
@@ -237,7 +241,7 @@ public class RecordWriterTest
         List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
         for (UploadRequest ur : requests)
         {
-            assertNotNull(ur.fileHash);
+            assertNotNull(ur.digest);
         }
     }
 
@@ -312,7 +316,7 @@ public class RecordWriterTest
         List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
         for (UploadRequest ur : requests)
         {
-            assertNotNull(ur.fileHash);
+            assertNotNull(ur.digest);
         }
     }
 
@@ -342,7 +346,7 @@ public class RecordWriterTest
         List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
         for (UploadRequest ur : requests)
         {
-            assertNotNull(ur.fileHash);
+            assertNotNull(ur.digest);
         }
     }
 
@@ -377,14 +381,14 @@ public class RecordWriterTest
         List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
         for (UploadRequest ur : requests)
         {
-            assertNotNull(ur.fileHash);
+            assertNotNull(ur.digest);
         }
     }
 
     @Test
     public void testCorruptSSTable()
     {
-        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw.setOutDir(path), path));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path, dp) -> new SSTableWriter(tw.setOutDir(path), path, digestAlgorithm));
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         // TODO: Add better error handling with human-readable exception 
messages in SSTableReader::new
         // That way we can assert on the exception thrown here
@@ -394,7 +398,7 @@ public class RecordWriterTest
     @Test
     public void testWriteWithOutOfRangeTokenFails()
     {
-        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path, dp) -> new SSTableWriter(tw, folder, digestAlgorithm));
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, 
Range.all(), false, false, false);
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
         String expectedErr = "java.lang.IllegalStateException: Received Token 
" +
@@ -405,7 +409,7 @@ public class RecordWriterTest
     @Test
     public void testAddRowThrowingFails()
     {
-        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path, dp) -> new SSTableWriter(tw, folder, digestAlgorithm));
         tw.setAddRowThrows(true);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
@@ -417,7 +421,7 @@ public class RecordWriterTest
     {
         // Mock context returns a 60-minute allowable time skew, so we use 
something just outside the limits
         long sixtyOneMinutesInMillis = TimeUnit.MINUTES.toMillis(61);
-        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path, dp) -> new SSTableWriter(tw, folder, digestAlgorithm));
         writerContext.setTimeProvider(() -> System.currentTimeMillis() - 
sixtyOneMinutesInMillis);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
@@ -463,7 +467,7 @@ public class RecordWriterTest
         List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
         for (UploadRequest ur : requests)
         {
-            assertNotNull(ur.fileHash);
+            assertNotNull(ur.digest);
         }
     }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
index 7155f12..58dc982 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.bridge.CassandraVersionFeatures;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -53,7 +54,8 @@ public class SSTableWriterTest
                      .collect(Collectors.toList());
     }
 
-    private @NotNull TokenRangeMapping<RingInstance> tokenRangeMapping = 
TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 12);
+    @NotNull
+    private final TokenRangeMapping<RingInstance> tokenRangeMapping = 
TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 12);
 
     @BeforeAll
     public static void setProps()
@@ -76,7 +78,7 @@ public class SSTableWriterTest
     }
 
     @TempDir
-    public Path tmpDir; // CHECKSTYLE IGNORE: Public mutable field for testing
+    private Path tmpDir;
 
 
     @ParameterizedTest
@@ -84,7 +86,7 @@ public class SSTableWriterTest
     public void canCreateWriterForVersion(String version) throws IOException
     {
         MockBulkWriterContext writerContext = new 
MockBulkWriterContext(tokenRangeMapping, version, 
ConsistencyLevel.CL.LOCAL_QUORUM);
-        SSTableWriter tw = new SSTableWriter(writerContext, tmpDir);
+        SSTableWriter tw = new SSTableWriter(writerContext, tmpDir, new 
XXHash32DigestAlgorithm());
         tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1, 
"course", "foo", "marks", 1));
         tw.close(writerContext, 1);
         try (DirectoryStream<Path> dataFileStream = 
Files.newDirectoryStream(tw.getOutDir(), "*Data.db"))
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
index 25147ca..b809f88 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
@@ -45,6 +45,8 @@ import 
org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
+import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -64,11 +66,12 @@ public class StreamSessionConsistencyTest
     private static final Map<String, Object> COLUMN_BIND_VALUES = 
ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2);
 
     @TempDir
-    public Path folder; // CHECKSTYLE IGNORE: Public mutable field for testing
+    private Path folder;
     private MockTableWriter tableWriter;
     private StreamSession streamSession;
     private MockBulkWriterContext writerContext;
     private final MockScheduledExecutorService executor = new 
MockScheduledExecutorService();
+    private DigestAlgorithm digestAlgorithm;
 
     public static Collection<Object[]> data()
     {
@@ -81,6 +84,7 @@ public class StreamSessionConsistencyTest
 
     private void setup(ConsistencyLevel.CL consistencyLevel, List<Integer> 
failuresPerDc)
     {
+        digestAlgorithm = new XXHash32DigestAlgorithm();
         tableWriter = new MockTableWriter(folder);
         writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, 
"cassandra-4.0.0", consistencyLevel);
         streamSession = new StreamSession(writerContext,
@@ -118,7 +122,7 @@ public class StreamSessionConsistencyTest
                 return new DataTransferApi.RemoteCommitResult(true, 
Collections.emptyList(), uuids, "");
             }
         });
-        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder);
+        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder, digestAlgorithm);
         tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES);
         tr.close(writerContext, 1);
         streamSession.scheduleStream(tr);
@@ -163,7 +167,7 @@ public class StreamSessionConsistencyTest
         ImmutableMap<String, AtomicInteger> dcFailures = 
ImmutableMap.of("DC1", dc1Failures, "DC2", dc2Failures);
         boolean shouldFail = calculateFailure(consistencyLevel, 
dc1Failures.get(), dc2Failures.get());
         writerContext.setUploadSupplier(instance -> 
dcFailures.get(instance.datacenter()).getAndDecrement() <= 0);
-        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder);
+        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder, digestAlgorithm);
         tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES);
         tr.close(writerContext, 1);
         streamSession.scheduleStream(tr);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
index 3a71640..82924cb 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
@@ -40,6 +40,8 @@ import org.junit.jupiter.api.io.TempDir;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
+import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 import org.jetbrains.annotations.NotNull;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -58,7 +60,7 @@ public class StreamSessionTest
     public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to load 1 
ranges with LOCAL_QUORUM";
     private static final Map<String, Object> COLUMN_BOUND_VALUES = 
ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2);
     @TempDir
-    public Path folder; // CHECKSTYLE IGNORE: Public mutable field for testing
+    private Path folder;
     private static final int FILES_PER_SSTABLE = 8;
     private static final int RF = 3;
 
@@ -69,10 +71,12 @@ public class StreamSessionTest
     private MockScheduledExecutorService executor;
     private MockTableWriter tableWriter;
     private Range<BigInteger> range;
+    private DigestAlgorithm digestAlgorithm;
 
     @BeforeEach
     public void setup()
     {
+        digestAlgorithm = new XXHash32DigestAlgorithm();
         range = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED, 
BigInteger.valueOf(199L), BoundType.CLOSED);
         tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(0, 
ImmutableMap.of("DC1", 3), 12);
         writerContext = getBulkWriterContext();
@@ -94,7 +98,7 @@ public class StreamSessionTest
     @Test
     public void testScheduleStreamSendsCorrectFilesToCorrectInstances() throws 
IOException, ExecutionException, InterruptedException
     {
-        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder);
+        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder, digestAlgorithm);
         tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
@@ -109,7 +113,7 @@ public class StreamSessionTest
     @Test
     public void testMismatchedTokenRangeFails() throws IOException
     {
-        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder);
+        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder, digestAlgorithm);
         tr.addRow(BigInteger.valueOf(9999L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         IllegalStateException illegalStateException = 
assertThrows(IllegalStateException.class,
@@ -158,7 +162,7 @@ public class StreamSessionTest
     public void testOutDirCreationFailureCleansAllReplicas()
     {
         assertThrows(RuntimeException.class, () -> {
-            SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
tableWriter.getOutDir());
+            SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
tableWriter.getOutDir(), digestAlgorithm);
             tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
             tr.close(writerContext, 1);
             tableWriter.removeOutDir();
@@ -208,7 +212,7 @@ public class StreamSessionTest
                 return new DataTransferApi.RemoteCommitResult(true, null, 
uuids, "");
             }
         });
-        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder);
+        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder, digestAlgorithm);
         tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
@@ -236,7 +240,7 @@ public class StreamSessionTest
             }
         });
 
-        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder);
+        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder, digestAlgorithm);
         tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
@@ -252,7 +256,7 @@ public class StreamSessionTest
     private void runFailedUpload() throws IOException, ExecutionException, 
InterruptedException
     {
         writerContext.setUploadSupplier(instance -> false);
-        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder);
+        SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder, digestAlgorithm);
         tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/UploadRequest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/UploadRequest.java
index 637ad2d..c011fc3 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/UploadRequest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/UploadRequest.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.spark.bulkwriter;
 
 import java.nio.file.Path;
 
-import org.apache.cassandra.spark.common.MD5Hash;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 
 public class UploadRequest
@@ -29,22 +29,22 @@ public class UploadRequest
     public final Path path;
     public final int ssTableIdx;
     public final CassandraInstance instance;
-    public final String sesssionId;
-    public final MD5Hash fileHash;
+    public final String sessionId;
+    public final Digest digest;
     public final boolean uploadSucceeded;
 
     public UploadRequest(Path path,
                          int ssTableIdx,
                          CassandraInstance instance,
-                         String sesssionId,
-                         MD5Hash fileHash,
+                         String sessionId,
+                         Digest digest,
                          boolean uploadSucceeded)
     {
         this.path = path;
         this.ssTableIdx = ssTableIdx;
         this.instance = instance;
-        this.sesssionId = sesssionId;
-        this.fileHash = fileHash;
+        this.sessionId = sessionId;
+        this.digest = digest;
         this.uploadSucceeded = uploadSucceeded;
     }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithmTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithmTest.java
new file mode 100644
index 0000000..153bc6d
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithmTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import org.apache.cassandra.spark.common.Digest;
+import org.apache.cassandra.spark.common.MD5Digest;
+
+import static 
org.apache.cassandra.spark.utils.ResourceUtils.writeResourceToPath;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link MD5DigestAlgorithm}
+ */
+class MD5DigestAlgorithmTest
+{
+    @TempDir
+    private Path tempPath;
+
+    // To generate test files I used:
+    // $ base64 -i /dev/urandom | head -c 1048576 > file1.txt
+    // $ base64 -i /dev/urandom | head -c 524288 > file2.txt
+    // $ base64 -i /dev/urandom | head -c 131072 > file3.txt
+    // To calculate MD5 I used:
+    // $ cat file1.txt | openssl dgst -md5 -binary | openssl enc -base64 # -> 
VqSURYiCXjZIgP+CO9IkLQ==
+    // $ cat file2.txt | openssl dgst -md5 -binary | openssl enc -base64 # -> 
vFoVTqVngw7JRj8yJfk3UA==
+    // $ cat file3.txt | openssl dgst -md5 -binary | openssl enc -base64 # -> 
RXASCHthSSrMt7YOKJ6ODQ==
+
+    @ParameterizedTest(name = "{index} fileName={0} expectedMd5={1}")
+    @CsvSource({
+    "file1.txt,VqSURYiCXjZIgP+CO9IkLQ==",
+    "file2.txt,vFoVTqVngw7JRj8yJfk3UA==",
+    "file3.txt,RXASCHthSSrMt7YOKJ6ODQ==",
+    })
+    void testMD5Provider(String fileName, String expectedMd5) throws 
IOException
+    {
+        ClassLoader classLoader = 
MD5DigestAlgorithmTest.class.getClassLoader();
+        Path path = writeResourceToPath(classLoader, tempPath, "digest/" + 
fileName);
+        assertThat(path).exists();
+
+        Digest digest = new MD5DigestAlgorithm().calculateFileDigest(path);
+        assertThat(digest).isInstanceOf(MD5Digest.class);
+        assertThat(digest.value()).isEqualTo(expectedMd5);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/ResourceUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/ResourceUtils.java
new file mode 100644
index 0000000..1d6d68f
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/ResourceUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/**
+ * A utilities class to handle resources for tests
+ */
+public final class ResourceUtils
+{
+    private ResourceUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    /**
+     * Writes a resource with {@code resourceName} loaded from the {@link 
ClassLoader classLoader} into the
+     * {@code destinationPath}
+     *
+     * @param classLoader     the class loader for the resource
+     * @param destinationPath the destination path to write the file
+     * @param resourceName    the name of the resource to be loaded
+     * @return the {@link Path} to the created resource
+     */
+    public static Path writeResourceToPath(ClassLoader classLoader, Path 
destinationPath, String resourceName)
+    {
+        try
+        {
+            Path resourcePath = destinationPath.resolve(resourceName);
+
+            // ensure parent directory is created
+            Files.createDirectories(resourcePath.getParent());
+
+            try (InputStream inputStream = 
classLoader.getResourceAsStream(resourceName);
+                 OutputStream outputStream = 
Files.newOutputStream(resourcePath))
+            {
+                assertThat(inputStream).isNotNull();
+
+                int length;
+                byte[] buffer = new byte[1024];
+                while ((length = inputStream.read(buffer)) != -1)
+                {
+                    outputStream.write(buffer, 0, length);
+                }
+            }
+            return resourcePath;
+        }
+        catch (IOException exception)
+        {
+            String failureMessage = "Unable to create resource " + 
resourceName;
+            fail(failureMessage, exception);
+            throw new RuntimeException(failureMessage, exception);
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTest.java
similarity index 97%
rename from 
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTests.java
rename to 
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTest.java
index b7bbf10..81ad0fc 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTest.java
@@ -64,7 +64,7 @@ import static 
org.quicktheories.generators.SourceDSL.arbitrary;
  * Test the {@link SSTableInputStream} by firing up an in-test HTTP server and 
reading the files with an HTTP client
  * Compares the MD5s to verify file bytes match bytes returned by {@link 
SSTableInputStream}.
  */
-public class SSTableInputStreamHttpTests
+public class SSTableInputStreamHttpTest
 {
     static final ExecutorService HTTP_EXECUTOR =
             Executors.newFixedThreadPool(4, new 
ThreadFactoryBuilder().setNameFormat("http-server-%d")
@@ -74,10 +74,10 @@ public class SSTableInputStreamHttpTests
             Executors.newFixedThreadPool(4, new 
ThreadFactoryBuilder().setNameFormat("http-client-%d")
                                                                       
.setDaemon(true)
                                                                       
.build());
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableInputStreamHttpTests.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableInputStreamHttpTest.class);
 
     @TempDir
-    public static Path DIRECTORY;  // CHECKSTYLE IGNORE: Constant cannot be 
made final
+    private static Path directory;
     private static final String HOST = "localhost";
     private static final int PORT = 8001;
     private static final int HTTP_CLIENT_CHUNK_SIZE = 8192;
@@ -95,7 +95,7 @@ public class SSTableInputStreamHttpTests
             try
             {
                 String uri = 
exchange.getRequestURI().getPath().replaceFirst("/", "");
-                Path path = DIRECTORY.resolve(uri);
+                Path path = directory.resolve(uri);
 
                 // Extract Range from header
                 long size = Files.size(path);
@@ -266,7 +266,7 @@ public class SSTableInputStreamHttpTests
     {
         try
         {
-            Path path = Files.createTempFile(DIRECTORY, null, null);
+            Path path = Files.createTempFile(directory, null, null);
             MessageDigest digest = DigestUtils.getMd5Digest();
             try (BufferedOutputStream out = new 
BufferedOutputStream(Files.newOutputStream(path)))
             {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithmTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithmTest.java
new file mode 100644
index 0000000..92b32e9
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithmTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import org.apache.cassandra.spark.common.Digest;
+import org.apache.cassandra.spark.common.XXHash32Digest;
+
+import static 
org.apache.cassandra.spark.utils.ResourceUtils.writeResourceToPath;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link XXHash32DigestAlgorithm}
+ */
+class XXHash32DigestAlgorithmTest
+{
+    @TempDir
+    private Path tempPath;
+
+    // To generate test files I used:
+    // $ base64 -i /dev/urandom | head -c 1048576 > file1.txt
+    // $ base64 -i /dev/urandom | head -c 524288 > file2.txt
+    // $ base64 -i /dev/urandom | head -c 131072 > file3.txt
+    // To calculate xxhash I installed xxhash (brew install xxhash):
+    // $ xxh32sum file1.txt # -> d76a44a5
+    // $ xxh32sum file2.txt # -> ef976cbe
+    // $ xxh32sum file3.txt # -> 08321e1e
+
+    @ParameterizedTest(name = "{index} fileName={0} expectedMd5={1}")
+    @CsvSource({
+    "file1.txt,ffffffffd76a44a5",
+    "file2.txt,ffffffffef976cbe",
+    "file3.txt,8321e1e",
+    })
+    void testXXHash32Provider(String fileName, String expectedXXHash32) throws 
IOException
+    {
+        ClassLoader classLoader = 
MD5DigestAlgorithmTest.class.getClassLoader();
+        Path path = writeResourceToPath(classLoader, tempPath, "digest/" + 
fileName);
+        assertThat(path).exists();
+
+        Digest digest = new 
XXHash32DigestAlgorithm().calculateFileDigest(path);
+        assertThat(digest).isInstanceOf(XXHash32Digest.class);
+
+        XXHash32Digest xxHash32Digest = (XXHash32Digest) digest;
+        assertThat(xxHash32Digest.value()).isEqualTo(expectedXXHash32);
+        assertThat(xxHash32Digest.seedHex()).isEqualTo("0");
+    }
+}
diff --git a/cassandra-analytics-core/src/test/resources/digest/file1.txt 
b/cassandra-analytics-core/src/test/resources/digest/file1.txt
new file mode 100644
index 0000000..6a5c5cb
--- /dev/null
+++ b/cassandra-analytics-core/src/test/resources/digest/file1.txt
@@ -0,0 +1 @@
+MGz17YVmWKTHp3Q1eKm34XBSSVZmBogGnFOC//tXbny83crrQ256VuXF6copnLayBe18eOQos9ZIWlhBP/sGB2Dl+JIUKhnZ9fc3eBSYv0V9cnqh+ImC02k4XijF7T0H27VKf+EcbNCRmERnROuIeX6oLDlUeKpsoErOQgVW3BjwBKf08TrK8lgc4BTZfF77uTmAGJJz9AA5N0ZV3LQ6qn2W7I8MiMqllak37VNI2J2aI5V46lq40/w3a4au3AvtD16UM+lHvsxyOvbEPKd6fqdFU80IVjZXfrtA6eRpuVc03w8ptFyODHmy7Lpy8aVFtI3tXQODDbeC5kWMYP7KSgTVG5fO1P/W3WcPKPchj6h3SXRlW46wXtEmi6y16eBNxVT2tjinS4tZheXwQBeTV03P9PJg3pDnarKGyRaRaWYN+yvK8DAYlfxyy27kvSTx/6e4Pou8GJW00RJMQ+oQHffIjLnubPPH35K5/5Dwu8XO
 [...]
\ No newline at end of file
diff --git a/cassandra-analytics-core/src/test/resources/digest/file2.txt 
b/cassandra-analytics-core/src/test/resources/digest/file2.txt
new file mode 100644
index 0000000..3deea33
--- /dev/null
+++ b/cassandra-analytics-core/src/test/resources/digest/file2.txt
@@ -0,0 +1 @@
+l9a7GQdACyjuRrhsZqNkTBvl+1kfYLrSpn9RH/jyP0eAjS6Sl/VQxfsNAgPHFOOc0wtj0VwntcJN2loQSXdyQakhSOJpyT/EybE4hYzRqU6yCQmNl6Oi9P8No+hA0PBnlZIJT5rmKpj05CcHNdMADbj2K9+Oa0fJGMYhL7hzSzdOcimp4LnwURIWxzI7jDK8ve0Mo7BUfHuLSideMBBjfNcCny3dY43dL0PIwGtWnrBLZF81Dvjc4csGkz4OTJ31biW0ivHqVCsMRKx2eAXw0FB2/0s7iarPcpkyHk20BnIsi2tjOv0btnQC9p7PL+HArealU3rZW/y1uKupKjdFPGdD9+1kCQdVR+J5qi+udp5Bj22qfioTXd3BU2xWxDgxtHUB9Z8k4CFx6/dD46cz+1dlCtXi2HWvgsyUVdvL3rJJWP2yyzvsdn/3KmCj/uc1lYD7yvFsIInGzjRy8vCcwg2cNFNlNIuxZnXB4GONKUVp
 [...]
\ No newline at end of file
diff --git a/cassandra-analytics-core/src/test/resources/digest/file3.txt 
b/cassandra-analytics-core/src/test/resources/digest/file3.txt
new file mode 100644
index 0000000..e20cf24
--- /dev/null
+++ b/cassandra-analytics-core/src/test/resources/digest/file3.txt
@@ -0,0 +1 @@
+Mfwzkymsh9jm1FZSEXzTDelmCU69gjd9fzrUQcQtfYxNS4rOZWBbZGvEXOzUumuF2fW6xCZ3o7ZZuO4cvL5ZqqO+fPT+CRlt4KwDeRXyZMApiXdMXiVdoRwaQXIjlIf10dEn2KMbenVb9ClZlvWV8JcOWZxoXHs+P/LJPeq06mYY4/SzdCJOQdzyTVWASsHaMtNRgcjEWjigvPsxctXPGX7Lp9Du7E9WRFm5xmoeJGgYslOI6aU54GYfDAavV5cIdwCHawbu7VH1aTDj3VImC+rHlUm+/ZCsczqpjJSe37KCd/SrL1vqlYe0zVf4WyX4R6zJCvmh0/A2XaRkMYeM9wxxAP7+dw43dGI7gceh+zWzyFk/vnZj1ZSvdrI38WgnC0NWP2r0ilvB1OQemSv6XnL2m5n92s0aRTiw6SxFYSSUJ4yu9kNudPdVWGhEVm9S1P3VWYvfsQsdIKwh9j26ascLZHJ0Si3Tdnsz3mqT/8sM
 [...]
\ No newline at end of file
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
index 29e949c..05dd7e2 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
@@ -34,7 +34,7 @@ public class AbstractClusterUtils
      * @param nodeNumber the number of the node for which a configuration 
should be created
      * @return the instance configuration for the specified node number
      */
-    public static InstanceConfig createInstanceConfig(AbstractCluster cluster, 
int nodeNumber)
+    public static InstanceConfig createInstanceConfig(AbstractCluster<?> 
cluster, int nodeNumber)
     {
         return cluster.createInstanceConfig(nodeNumber);
     }
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index cdc0708..e9b9542 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -53,6 +53,7 @@ import io.vertx.core.Vertx;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
@@ -322,6 +323,29 @@ public abstract class SharedClusterIntegrationTestBase
                  .collect(Collectors.joining(","));
     }
 
+    /**
+     * Convenience method to query all data from the provided {@code table} at 
consistency level {@code LOCAL_QUORUM}.
+     *
+     * @param table the qualified Cassandra table name
+     * @return all the data queried from the table
+     */
+    protected Object[][] queryAllData(QualifiedName table)
+    {
+        return queryAllData(table, ConsistencyLevel.LOCAL_QUORUM);
+    }
+
+    /**
+     * Convenience method to query all data from the provided {@code table} at 
the specified consistency level.
+     *
+     * @param table            the qualified Cassandra table name
+     * @param consistencyLevel the consistency level to use for querying the 
data
+     * @return all the data queried from the table
+     */
+    protected Object[][] queryAllData(QualifiedName table, ConsistencyLevel 
consistencyLevel)
+    {
+        return cluster.coordinator(1).execute(String.format("SELECT * FROM 
%s;", table), consistencyLevel);
+    }
+
     static class IntegrationTestModule extends AbstractModule
     {
         private final AbstractCluster<? extends IInstance> cluster;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java
index 3acbcb1..2650cd7 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java
@@ -20,15 +20,8 @@
 package org.apache.cassandra.analytics;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.LongStream;
 import java.util.stream.Stream;
 
 import org.junit.jupiter.params.ParameterizedTest;
@@ -37,7 +30,6 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import com.vdurmont.semver4j.Semver;
 import org.apache.cassandra.distributed.UpgradeableCluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.Versions;
@@ -45,24 +37,16 @@ import 
org.apache.cassandra.sidecar.testing.JvmDTestSharedClassesPredicate;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.cassandra.testing.TestVersion;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.types.StructType;
 
+import static 
org.apache.cassandra.analytics.DataGenerationUtils.generateCourseData;
+import static org.apache.cassandra.analytics.SparkTestUtils.validateWrites;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
 import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
 import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
 import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
-import static org.apache.spark.sql.types.DataTypes.BinaryType;
-import static org.apache.spark.sql.types.DataTypes.LongType;
-import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests the bulk writer behavior when requiring quoted identifiers for 
keyspace, table name, and column names.
@@ -83,47 +67,14 @@ class QuoteIdentifiersWriteTest extends 
SharedClusterSparkIntegrationTestBase
     void testQuoteIdentifiersBulkWrite(QualifiedName tableName)
     {
         SparkSession spark = getOrCreateSparkSession();
-        SparkContext sc = spark.sparkContext();
-        JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(sc);
-        SQLContext sql = spark.sqlContext();
-
-        int parallelism = sc.defaultParallelism();
-        JavaRDD<Row> rows = genDataset(javaSparkContext, ROW_COUNT, 
parallelism);
-        Dataset<Row> df = sql.createDataFrame(rows, writeSchema());
-
+        // Generates course data from and renames the dataframe columns to use 
case-sensitive and reserved
+        // words in the dataframe
+        Dataset<Row> df = generateCourseData(spark, 
ROW_COUNT).toDF("IdEnTiFiEr", // case-sensitive struct
+                                                                    "course",
+                                                                    "limit"); 
// limit is a reserved word in Cassandra
         bulkWriterDataFrameWriter(df, 
tableName).option(WriterOptions.QUOTE_IDENTIFIERS.name(), "true")
                                                 .save();
-        validateWrites(tableName, rows);
-    }
-
-    void validateWrites(QualifiedName tableName, JavaRDD<Row> rowsWritten)
-    {
-        // build a set of entries read from Cassandra into a set
-        Set<String> actualEntries = Arrays.stream(cluster.coordinator(1)
-                                                         
.execute(String.format("SELECT * FROM %s;", tableName), 
ConsistencyLevel.LOCAL_QUORUM))
-                                          .map((Object[] columns) -> 
String.format("%s:%s:%s",
-                                                                               
    new String(((ByteBuffer) columns[1]).array(), StandardCharsets.UTF_8),
-                                                                               
    columns[0],
-                                                                               
    columns[2]))
-                                          .collect(Collectors.toSet());
-
-        // Number of entries in Cassandra must match the original datasource
-        assertThat(actualEntries.size()).isEqualTo(rowsWritten.count());
-
-        // remove from actual entries to make sure that the data read is the 
same as the data written
-        rowsWritten.collect()
-                   .forEach(row -> {
-                       String key = String.format("%s:%d:%d",
-                                                  new String((byte[]) 
row.get(0), StandardCharsets.UTF_8),
-                                                  row.getLong(1),
-                                                  row.getLong(2));
-                       assertThat(actualEntries.remove(key)).as(key + " is 
expected to exist in the actual entries")
-                                                            .isTrue();
-                   });
-
-        // If this fails, it means there was more data in the database than we 
expected
-        assertThat(actualEntries).as("All entries are expected to be read from 
database")
-                                 .isEmpty();
+        validateWrites(df.collectAsList(), queryAllData(tableName));
     }
 
     static Stream<Arguments> testInputs()
@@ -159,7 +110,7 @@ class QuoteIdentifiersWriteTest extends 
SharedClusterSparkIntegrationTestBase
     protected void initializeSchemaForTest()
     {
         String createTableStatement = "CREATE TABLE IF NOT EXISTS %s " +
-                                      "(\"IdEnTiFiEr\" bigint, course blob, 
\"limit\" bigint," +
+                                      "(\"IdEnTiFiEr\" int, course text, 
\"limit\" int," +
                                       " PRIMARY KEY(\"IdEnTiFiEr\"));";
 
         TABLE_NAMES.forEach(name -> {
@@ -167,28 +118,4 @@ class QuoteIdentifiersWriteTest extends 
SharedClusterSparkIntegrationTestBase
             createTestTable(name, createTableStatement);
         });
     }
-
-    static StructType writeSchema()
-    {
-        return new StructType()
-               .add("course", BinaryType, false)
-               .add("IdEnTiFiEr", LongType, false) // case-sensitive struct
-               .add("limit", LongType, false); // limit is a reserved word in 
Cassandra
-    }
-
-    static JavaRDD<Row> genDataset(JavaSparkContext sc, int recordCount, 
Integer parallelism)
-    {
-        long recordsPerPartition = recordCount / parallelism;
-        long remainder = recordCount - (recordsPerPartition * parallelism);
-        List<Integer> seq = IntStream.range(0, 
parallelism).boxed().collect(Collectors.toList());
-        return sc.parallelize(seq, parallelism).mapPartitionsWithIndex(
-        (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, 
integerIterator) -> {
-            long firstRecordNumber = index * recordsPerPartition;
-            long recordsToGenerate = index.equals(parallelism) ? remainder : 
recordsPerPartition;
-            return LongStream.range(0, recordsToGenerate).mapToObj(offset -> {
-                long limit = firstRecordNumber + offset;
-                return RowFactory.create(("course-" + limit).getBytes(), 
limit, limit);
-            }).iterator();
-        }, false);
-    }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 3697956..53806cb 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -19,7 +19,11 @@
 
 package org.apache.cassandra.analytics;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.spark.KryoRegister;
@@ -33,6 +37,8 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * Helper methods for Spark tests
  */
@@ -127,4 +133,32 @@ public final class SparkTestUtils
         KryoRegister.setup(sparkConf);
         return sparkConf;
     }
+
+    public static void validateWrites(List<Row> sourceData, Object[][] 
queriedData)
+    {
+        // build a set of entries read from Cassandra into a set
+        Set<String> actualEntries = Arrays.stream(queriedData)
+                                          .map((Object[] columns) -> 
String.format("%s:%s:%s",
+                                                                               
    columns[0],
+                                                                               
    columns[1],
+                                                                               
    columns[2]))
+                                          .collect(Collectors.toSet());
+
+        // Number of entries in Cassandra must match the original datasource
+        assertThat(actualEntries.size()).isEqualTo(sourceData.size());
+
+        // remove from actual entries to make sure that the data read is the 
same as the data written
+        sourceData.forEach(row -> {
+            String key = String.format("%d:%s:%d",
+                                       row.getInt(0),
+                                       row.getString(1),
+                                       row.getInt(2));
+            assertThat(actualEntries.remove(key)).as(key + " is expected to 
exist in the actual entries")
+                                                 .isTrue();
+        });
+
+        // If this fails, it means there was more data in the database than we 
expected
+        assertThat(actualEntries).as("All entries are expected to be read from 
database")
+                                 .isEmpty();
+    }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/WriterDigestIntegrationTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/WriterDigestIntegrationTest.java
new file mode 100644
index 0000000..8589a19
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/WriterDigestIntegrationTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import com.vdurmont.semver4j.Semver;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
+import org.apache.cassandra.testing.TestVersion;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import static org.apache.cassandra.analytics.SparkTestUtils.validateWrites;
+import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static 
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+/**
+ * Tests bulk writes with different digest options
+ */
+class WriterDigestIntegrationTest extends SharedClusterSparkIntegrationTestBase
+{
+    static final QualifiedName DEFAULT_DIGEST_TABLE = 
uniqueTestTableFullName("default_digest");
+    static final QualifiedName MD5_DIGEST_TABLE = 
uniqueTestTableFullName("md5_digest");
+    static final QualifiedName CORRUPT_SSTABLE_TABLE = 
uniqueTestTableFullName("corrupt_sstable");
+    static final List<QualifiedName> TABLE_NAMES = 
Arrays.asList(DEFAULT_DIGEST_TABLE, MD5_DIGEST_TABLE,
+                                                                 
CORRUPT_SSTABLE_TABLE);
+    Dataset<Row> df;
+
+    @Test
+    void testDefaultDigest()
+    {
+        bulkWriterDataFrameWriter(df, DEFAULT_DIGEST_TABLE).save();
+        validateWrites(df.collectAsList(), queryAllData(DEFAULT_DIGEST_TABLE));
+    }
+
+    @Test
+    void testMD5Digest()
+    {
+        SparkSession spark = getOrCreateSparkSession();
+        Dataset<Row> df = DataGenerationUtils.generateCourseData(spark, 
ROW_COUNT);
+        bulkWriterDataFrameWriter(df, 
MD5_DIGEST_TABLE).option(WriterOptions.DIGEST.name(), "MD5").save();
+        validateWrites(df.collectAsList(), queryAllData(MD5_DIGEST_TABLE));
+    }
+
+    @Test
+    void failsOnInvalidDigestOption()
+    {
+        assertThatIllegalArgumentException()
+        .isThrownBy(() -> bulkWriterDataFrameWriter(df, 
DEFAULT_DIGEST_TABLE).option(WriterOptions.DIGEST.name(), "invalid")
+                                                                             
.save())
+        .withMessageContaining("Key digest type with value invalid is not a 
valid Enum of type class 
org.apache.cassandra.spark.bulkwriter.DigestAlgorithms");
+    }
+
+    @Override
+    protected void beforeTestStart()
+    {
+        SparkSession spark = getOrCreateSparkSession();
+        df = DataGenerationUtils.generateCourseData(spark, ROW_COUNT);
+    }
+
+    @Override
+    protected UpgradeableCluster provisionCluster(TestVersion testVersion) 
throws IOException
+    {
+        // spin up a C* cluster using the in-jvm dtest
+        Versions versions = Versions.find();
+        Versions.Version requestedVersion = versions.getLatest(new 
Semver(testVersion.version(), Semver.SemverType.LOOSE));
+
+        UpgradeableCluster.Builder clusterBuilder =
+        UpgradeableCluster.build(1)
+                          .withDynamicPortAllocation(true)
+                          .withVersion(requestedVersion)
+                          .withDCs(1)
+                          .withDataDirCount(1)
+                          .withConfig(config -> 
config.with(Feature.NATIVE_PROTOCOL)
+                                                      .with(Feature.GOSSIP)
+                                                      .with(Feature.JMX));
+        TokenSupplier tokenSupplier = TokenSupplier.evenlyDistributedTokens(1, 
clusterBuilder.getTokenCount());
+        clusterBuilder.withTokenSupplier(tokenSupplier);
+        UpgradeableCluster cluster = clusterBuilder.createWithoutStarting();
+        cluster.startup();
+        return cluster;
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        TABLE_NAMES.forEach(name -> {
+            createTestKeyspace(name, DC1_RF1);
+            createTestTable(name, CREATE_TEST_TABLE_STATEMENT);
+        });
+    }
+}
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
index cd06425..b2fa27c 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
@@ -202,7 +202,7 @@ public final class MapUtils
         String value = options.get(lowerCaseKey(key));
         try
         {
-            return value != null ? (T) Enum.valueOf(defaultValue.getClass(), 
value) : defaultValue;
+            return value != null ? (T) 
Enum.valueOf(defaultValue.getDeclaringClass(), value) : defaultValue;
         }
         catch (IllegalArgumentException exception)
         {
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index eb1767f..cc3f13f 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@ else
   SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
   
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}";
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-2eb3474d7037a2887bcd9dee1f64c2a36a7e8d26}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-2914d57f0428643b3a92b6af8f4da1b209d80c2a}"
   SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
   SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
   SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to