This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e329f32  CASSANDRASC-97: Add support for additional digest validation 
during SSTable upload
e329f32 is described below

commit e329f3232fae867879aba2cd0a766404aaf9a427
Author: Francisco Guerrero <[email protected]>
AuthorDate: Thu Jan 25 10:30:52 2024 -0800

    CASSANDRASC-97: Add support for additional digest validation during SSTable 
upload
    
    In this commit we add the ability to support additional digest algorithms 
for verification
    during SSTable uploads. We introduce the `DigestVerifierFactory` which now 
supports
    XXHash32 and MD5 `DigestVerifier`s.
    
    This commit also adds support for XXHash32 digests. Clients can now send 
the XXHash32 digest
    instead of MD5. This would allow both the clients and server the 
flexibility to utilize a more
    performant algorithm.
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-97
---
 CHANGES.txt                                        |   1 +
 .../cassandra/sidecar/client/RequestContext.java   |   7 +-
 .../cassandra/sidecar/client/SidecarClient.java    |   7 +-
 .../client/request/UploadSSTableRequest.java       |  14 +-
 .../sidecar/client/SidecarClientTest.java          |  80 ++++++++++-
 .../cassandra/sidecar/common/data/Digest.java      |  28 ++--
 .../cassandra/sidecar/common/data/MD5Digest.java   |  68 +++++++++
 .../sidecar/common/data/XXHash32Digest.java        | 110 +++++++++++++++
 .../common/http/SidecarHttpHeaderNames.java        |  22 ++-
 .../sidecar/data/SSTableUploadRequest.java         |  19 +--
 .../cassandra/sidecar/restore/RestoreJobUtil.java  |  14 +-
 .../sstableuploads/SSTableUploadHandler.java       |  21 ++-
 .../cassandra/sidecar/server/MainModule.java       |   9 --
 .../sidecar/utils/AsyncFileDigestVerifier.java     | 106 ++++++++++++++
 .../{ChecksumVerifier.java => DigestVerifier.java} |  15 +-
 .../sidecar/utils/DigestVerifierFactory.java       |  76 ++++++++++
 .../sidecar/utils/MD5ChecksumVerifier.java         | 108 --------------
 .../cassandra/sidecar/utils/MD5DigestVerifier.java |  72 ++++++++++
 .../cassandra/sidecar/utils/SSTableUploader.java   |  51 ++-----
 .../sidecar/utils/XXHash32DigestVerifier.java      |  84 +++++++++++
 .../sstableuploads/SSTableUploadHandlerTest.java   | 129 ++++++++++-------
 .../sidecar/utils/DigestVerifierFactoryTest.java   |  89 ++++++++++++
 ...erifierTest.java => MD5DigestVerifierTest.java} |  63 ++++-----
 .../cassandra/sidecar/utils/TestFileUtils.java     |  61 ++++++++
 .../sidecar/utils/XXHash32DigestVerifierTest.java  | 155 +++++++++++++++++++++
 25 files changed, 1092 insertions(+), 317 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6b5a9b9..3f1b4cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Add support for additional digest validation during SSTable upload 
(CASSANDRASC-97)
  * Add sidecar client changes for restore from S3 (CASSANDRASC-95)
  * Add restore SSTables from S3 into Cassandra feature to Cassandra Sidecar 
(CASSANDRASC-92)
  * Define routing order for http routes (CASSANDRASC-93)
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 9433e3a..7af57e9 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -44,6 +44,7 @@ import 
org.apache.cassandra.sidecar.client.retry.NoRetryPolicy;
 import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
 import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
 import 
org.apache.cassandra.sidecar.client.selection.SingleInstanceSelectionPolicy;
+import org.apache.cassandra.sidecar.common.data.Digest;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
 import org.jetbrains.annotations.Nullable;
 
@@ -449,14 +450,14 @@ public class RequestContext
          * @param tableName the table name in Cassandra
          * @param uploadId  an identifier for the upload
          * @param component SSTable component being uploaded
-         * @param checksum  hash value to check integrity of SSTable component 
uploaded
+         * @param digest    digest value to check integrity of SSTable 
component uploaded
          * @param filename  the path to the file to be uploaded
          * @return a reference to this Builder
          */
         public Builder uploadSSTableRequest(String keyspace, String tableName, 
String uploadId, String component,
-                                            String checksum, String filename)
+                                            Digest digest, String filename)
         {
-            return request(new UploadSSTableRequest(keyspace, tableName, 
uploadId, component, checksum, filename));
+            return request(new UploadSSTableRequest(keyspace, tableName, 
uploadId, component, digest, filename));
         }
 
         /**
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 3048808..42b7093 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.common.NodeSettings;
 import org.apache.cassandra.sidecar.common.data.CreateRestoreJobRequestPayload;
 import 
org.apache.cassandra.sidecar.common.data.CreateRestoreJobResponsePayload;
 import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload;
+import org.apache.cassandra.sidecar.common.data.Digest;
 import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.data.HealthResponse;
 import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
@@ -391,7 +392,7 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
      * @param table         the table name in Cassandra
      * @param uploadId      the unique identifier for the upload
      * @param componentName the name of the SSTable component
-     * @param checksum      hash value to check integrity of SSTable component 
uploaded
+     * @param digest        digest value to check integrity of SSTable 
component uploaded
      * @param filename      the path to the file to be uploaded
      * @return a completable future for the request
      */
@@ -400,7 +401,7 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                                         String table,
                                                         String uploadId,
                                                         String componentName,
-                                                        String checksum,
+                                                        Digest digest,
                                                         String filename)
     {
         return 
executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
@@ -408,7 +409,7 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                                                                
   table,
                                                                                
   uploadId,
                                                                                
   componentName,
-                                                                               
   checksum,
+                                                                               
   digest,
                                                                                
   filename)
                                                             .build());
     }
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
 
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
index ea0fe98..b93c82a 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
@@ -25,16 +25,16 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpMethod;
 import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.Digest;
 
 /**
  * Represents a request to upload an SSTable component
  */
 public class UploadSSTableRequest extends Request implements UploadableRequest
 {
-    private final String expectedChecksum;
+    private final Digest digest;
     private final String filename;
 
     /**
@@ -44,14 +44,14 @@ public class UploadSSTableRequest extends Request 
implements UploadableRequest
      * @param table     the table name in Cassandra
      * @param uploadId  an identifier for the upload
      * @param component SSTable component being uploaded
-     * @param checksum  hash value to check integrity of SSTable component 
uploaded
+     * @param digest    digest value to check integrity of SSTable component 
uploaded
      * @param filename  the path to the file to be uploaded
      */
     public UploadSSTableRequest(String keyspace, String table, String 
uploadId, String component,
-                                String checksum, String filename)
+                                Digest digest, String filename)
     {
         super(requestURI(keyspace, table, uploadId, component));
-        this.expectedChecksum = checksum;
+        this.digest = digest;
         this.filename = Objects.requireNonNull(filename, "the filename is must 
be non-null");
 
         if (!Files.exists(Paths.get(filename)))
@@ -72,12 +72,12 @@ public class UploadSSTableRequest extends Request 
implements UploadableRequest
     @Override
     public Map<String, String> headers()
     {
-        if (expectedChecksum == null)
+        if (digest == null)
         {
             return super.headers();
         }
         Map<String, String> headers = new HashMap<>(super.headers());
-        headers.put(HttpHeaderNames.CONTENT_MD5.toString(), expectedChecksum);
+        headers.putAll(digest.headers());
         return Collections.unmodifiableMap(headers);
     }
 
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index fb1102c..e06b709 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -63,6 +63,7 @@ import 
org.apache.cassandra.sidecar.common.data.CreateRestoreJobResponsePayload;
 import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.data.HealthResponse;
 import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
 import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.data.RingEntry;
 import org.apache.cassandra.sidecar.common.data.RingResponse;
@@ -70,6 +71,7 @@ import 
org.apache.cassandra.sidecar.common.data.SSTableImportResponse;
 import org.apache.cassandra.sidecar.common.data.SchemaResponse;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
 import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen;
 
@@ -78,6 +80,8 @@ import static 
io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpResponseStatus.PARTIAL_CONTENT;
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatException;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -689,7 +693,7 @@ abstract class SidecarClientTest
     }
 
     @Test
-    void testUploadSSTableWithoutChecksum(@TempDir Path tempDirectory) throws 
Exception
+    void testUploadSSTableWithoutDigest(@TempDir Path tempDirectory) throws 
Exception
     {
         Path fileToUpload = prepareFile(tempDirectory);
         try (MockWebServer server = new MockWebServer())
@@ -722,7 +726,7 @@ abstract class SidecarClientTest
     }
 
     @Test
-    void testUploadSSTableWithChecksum(@TempDir Path tempDirectory) throws 
Exception
+    void testUploadSSTableWithMD5Digest(@TempDir Path tempDirectory) throws 
Exception
     {
         Path fileToUpload = prepareFile(tempDirectory);
         try (MockWebServer server = new MockWebServer())
@@ -735,7 +739,7 @@ abstract class SidecarClientTest
                                         "cyclist_name",
                                         "0000-0000",
                                         "nb-1-big-TOC.txt",
-                                        "15a69dc6501aa5ae17af037fe053f610",
+                                        new 
MD5Digest("15a69dc6501aa5ae17af037fe053f610"),
                                         fileToUpload.toString())
                   .get(30, TimeUnit.SECONDS);
 
@@ -755,6 +759,76 @@ abstract class SidecarClientTest
         }
     }
 
+    @Test
+    void testUploadSSTableWithXXHashDigest(@TempDir Path tempDirectory) throws 
Exception
+    {
+        Path fileToUpload = prepareFile(tempDirectory);
+        try (MockWebServer server = new MockWebServer())
+        {
+            server.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+            SidecarInstanceImpl sidecarInstance = 
RequestExecutorTest.newSidecarInstance(server);
+            client.uploadSSTableRequest(sidecarInstance,
+                                        "cycling",
+                                        "cyclist_name",
+                                        "0000-0000",
+                                        "nb-1-big-TOC.txt",
+                                        new 
XXHash32Digest("15a69dc6501aa5ae17af037fe053f610"),
+                                        fileToUpload.toString())
+                  .get(30, TimeUnit.SECONDS);
+
+            assertThat(server.getRequestCount()).isEqualTo(1);
+            RecordedRequest request = server.takeRequest();
+            assertThat(request.getPath())
+            .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE
+                       .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, 
"0000-0000")
+                       .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, 
"cycling")
+                       .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, 
"cyclist_name")
+                       .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, 
"nb-1-big-TOC.txt"));
+            assertThat(request.getMethod()).isEqualTo("PUT");
+            assertThat(request.getHeader(CONTENT_XXHASH32))
+            .isEqualTo("15a69dc6501aa5ae17af037fe053f610");
+            assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isNull();
+            
assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80");
+            assertThat(request.getBodySize()).isEqualTo(80);
+        }
+    }
+
+    @Test
+    void testUploadSSTableWithXXHashDigestAndSeed(@TempDir Path tempDirectory) 
throws Exception
+    {
+        Path fileToUpload = prepareFile(tempDirectory);
+        try (MockWebServer server = new MockWebServer())
+        {
+            server.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+            SidecarInstanceImpl sidecarInstance = 
RequestExecutorTest.newSidecarInstance(server);
+            client.uploadSSTableRequest(sidecarInstance,
+                                        "cycling",
+                                        "cyclist_name",
+                                        "0000-0000",
+                                        "nb-1-big-TOC.txt",
+                                        new 
XXHash32Digest("15a69dc6501aa5ae17af037fe053f610", "123456"),
+                                        fileToUpload.toString())
+                  .get(30, TimeUnit.SECONDS);
+
+            assertThat(server.getRequestCount()).isEqualTo(1);
+            RecordedRequest request = server.takeRequest();
+            assertThat(request.getPath())
+            .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE
+                       .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, 
"0000-0000")
+                       .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, 
"cycling")
+                       .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, 
"cyclist_name")
+                       .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, 
"nb-1-big-TOC.txt"));
+            assertThat(request.getMethod()).isEqualTo("PUT");
+            assertThat(request.getHeader(CONTENT_XXHASH32))
+            .isEqualTo("15a69dc6501aa5ae17af037fe053f610");
+            
assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isEqualTo("123456");
+            
assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80");
+            assertThat(request.getBodySize()).isEqualTo(80);
+        }
+    }
+
     @Test
     void testStreamSSTableComponentWithNoRange() throws Exception
     {
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
similarity index 51%
copy from src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
copy to 
common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
index e338642..1d4f432 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
@@ -16,23 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.utils;
+package org.apache.cassandra.sidecar.common.data;
 
-import io.vertx.core.Future;
+import java.util.Map;
 
 /**
- * Interface to verify integrity of SSTables uploaded.
- * <p>
- * Note: If checksum calculations of multiple files are happening at the same 
time, we would want to limit concurrent
- * checksum calculations. Since {@link ChecksumVerifier} is currently used 
only by upload handler, we are not
- * introducing another limit here. Concurrent uploads limit should limit 
concurrent checksum calculations as well.
+ * Interface that represents a checksum digest
  */
-public interface ChecksumVerifier
+public interface Digest
 {
     /**
-     * @param checksum  expected checksum value
-     * @param filePath  path to SSTable component
-     * @return String   component path, if verification is a success, else a 
failed future is returned
+     * @return headers to be used in the HTTP request
      */
-    Future<String> verify(String checksum, String filePath);
+    Map<String, String> headers();
+
+    /**
+     * @return the string representation of the digest
+     */
+    String value();
+
+    /**
+     * @return the name of the digest's algorithm
+     */
+    String algorithm();
 }
diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
new file mode 100644
index 0000000..ba8b7b5
--- /dev/null
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implements the MD5 checksum digest
+ */
+public class MD5Digest implements Digest
+{
+    private final @NotNull String value;
+
+    /**
+     * Constructs a new MD5Digest with the provided MD5 {@code value}
+     *
+     * @param value the MD5 value
+     */
+    public MD5Digest(@NotNull String value)
+    {
+        this.value = Objects.requireNonNull(value, "value is required");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String value()
+    {
+        return value;
+    }
+
+    @Override
+    public String algorithm()
+    {
+        return "MD5";
+    }
+
+    /**
+     * @return MD5 headers for the Digest
+     */
+    @Override
+    public Map<String, String> headers()
+    {
+        return 
Collections.singletonMap(HttpHeaderNames.CONTENT_MD5.toString(), value);
+    }
+}
diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
new file mode 100644
index 0000000..48127e4
--- /dev/null
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
+
+/**
+ * Implements the XXHash32 Digest
+ */
+public class XXHash32Digest implements Digest
+{
+    private final @NotNull String value;
+    private final @Nullable String seedHex;
+
+    /**
+     * Constructs a new XXHashDigest with the provided XXHash {@code value}
+     *
+     * @param value the xxhash value
+     */
+    public XXHash32Digest(String value)
+    {
+        this(value, null);
+    }
+
+    /**
+     * Constructs a new instance with the provided XXHash {@code value} and 
the {@code seed} value.
+     *
+     * @param value the xxhash value
+     * @param seed  the seed
+     */
+    public XXHash32Digest(String value, int seed)
+    {
+        this(value, Integer.toHexString(seed));
+    }
+
+    /**
+     * Constructs a new XXHashDigest with the provided XXHash {@code value} 
and the seed value represented as
+     * a hexadecimal string
+     *
+     * @param value   the xxhash value
+     * @param seedHex the value of the seed represented as a hexadecimal value
+     */
+    public XXHash32Digest(@NotNull String value, @Nullable String seedHex)
+    {
+        this.value = Objects.requireNonNull(value, "value is required");
+        this.seedHex = seedHex;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String value()
+    {
+        return value;
+    }
+
+    /**
+     * @return the optional seed in hexadecimal format
+     */
+    public @Nullable String seedHex()
+    {
+        return seedHex;
+    }
+
+    @Override
+    public String algorithm()
+    {
+        return "XXHash32";
+    }
+
+    /**
+     * @return XXHash headers for the digest
+     */
+    @Override
+    public Map<String, String> headers()
+    {
+        Map<String, String> headers = new HashMap<>();
+        headers.put(CONTENT_XXHASH32, value);
+        if (seedHex != null)
+        {
+            headers.put(CONTENT_XXHASH32_SEED, seedHex);
+        }
+        return headers;
+    }
+}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java 
b/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
similarity index 51%
copy from src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
copy to 
common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
index e338642..347ed59 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
@@ -16,23 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.utils;
-
-import io.vertx.core.Future;
+package org.apache.cassandra.sidecar.common.http;
 
 /**
- * Interface to verify integrity of SSTables uploaded.
- * <p>
- * Note: If checksum calculations of multiple files are happening at the same 
time, we would want to limit concurrent
- * checksum calculations. Since {@link ChecksumVerifier} is currently used 
only by upload handler, we are not
- * introducing another limit here. Concurrent uploads limit should limit 
concurrent checksum calculations as well.
+ * Custom header names for sidecar
  */
-public interface ChecksumVerifier
+public final class SidecarHttpHeaderNames
 {
     /**
-     * @param checksum  expected checksum value
-     * @param filePath  path to SSTable component
-     * @return String   component path, if verification is a success, else a 
failed future is returned
+     * {@code "cassandra-content-xxhash32"}
+     */
+    public static final String CONTENT_XXHASH32 = "cassandra-content-xxhash32";
+    /**
+     * {@code "cassandra-content-xxhash32-seed"}
      */
-    Future<String> verify(String checksum, String filePath);
+    public static final String CONTENT_XXHASH32_SEED = 
"cassandra-content-xxhash32-seed";
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java 
b/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
index cef1491..60cfbd2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
+++ b/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.sidecar.data;
 
-import io.netty.handler.codec.http.HttpHeaderNames;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.common.data.SSTableUploads;
@@ -29,7 +28,6 @@ import 
org.apache.cassandra.sidecar.common.data.SSTableUploads;
 public class SSTableUploadRequest extends SSTableUploads
 {
     private final String component;
-    private final String expectedChecksum;
 
     /**
      * Constructs an SSTableUploadRequest
@@ -37,16 +35,13 @@ public class SSTableUploadRequest extends SSTableUploads
      * @param qualifiedTableName the qualified table name in Cassandra
      * @param uploadId           an identifier for the upload
      * @param component          SSTable component being uploaded
-     * @param expectedChecksum   expected hash value to check integrity of 
SSTable component uploaded
      */
     public SSTableUploadRequest(QualifiedTableName qualifiedTableName,
                                 String uploadId,
-                                String component,
-                                String expectedChecksum)
+                                String component)
     {
         super(qualifiedTableName, uploadId);
         this.component = component;
-        this.expectedChecksum = expectedChecksum;
     }
 
     /**
@@ -57,14 +52,6 @@ public class SSTableUploadRequest extends SSTableUploads
         return this.component;
     }
 
-    /**
-     * @return expected checksum value of SSTable component
-     */
-    public String expectedChecksum()
-    {
-        return this.expectedChecksum;
-    }
-
     /**
      * {@inheritDoc}
      */
@@ -75,7 +62,6 @@ public class SSTableUploadRequest extends SSTableUploads
                ", keyspace='" + keyspace() + '\'' +
                ", tableName='" + table() + '\'' +
                ", component='" + component + '\'' +
-               ", expectedChecksum='" + expectedChecksum + '\'' +
                '}';
     }
 
@@ -90,7 +76,6 @@ public class SSTableUploadRequest extends SSTableUploads
     {
         return new SSTableUploadRequest(qualifiedTableName,
                                         context.pathParam("uploadId"),
-                                        context.pathParam("component"),
-                                        
context.request().getHeader(HttpHeaderNames.CONTENT_MD5.toString()));
+                                        context.pathParam("component"));
     }
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 918d8c0..8828c8f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -114,12 +114,22 @@ public class RestoreJobUtil
      * @return the checksum hex string of the file's content. XXHash32 is 
employed as the hash algorithm.
      */
     public static String checksum(File file) throws IOException
+    {
+        int seed = 0x9747b28c; // random seed for initializing
+        return checksum(file, seed);
+    }
+
+    /**
+     * @param file the file to use to perform the checksum
+     * @param seed the seed to use for the hasher
+     * @return the checksum hex string of the file's content. XXHash32 is 
employed as the hash algorithm.
+     */
+    public static String checksum(File file, int seed) throws IOException
     {
         try (FileInputStream fis = new FileInputStream(file))
         {
             // might have shared hashers with ThreadLocal
             XXHashFactory factory = XXHashFactory.safeInstance();
-            int seed = 0x9747b28c; // random seed for initializing
             try (StreamingXXHash32 hasher = factory.newStreamingHash32(seed))
             {
                 byte[] buffer = new byte[KB_512];
@@ -128,7 +138,7 @@ public class RestoreJobUtil
                 {
                     hasher.update(buffer, 0, len);
                 }
-                return Long.toHexString(hasher.getValue()); 
+                return Long.toHexString(hasher.getValue());
             }
         }
     }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
index dc23141..9347f7f 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
@@ -42,6 +42,8 @@ import org.apache.cassandra.sidecar.routes.AbstractHandler;
 import org.apache.cassandra.sidecar.stats.SSTableStats;
 import org.apache.cassandra.sidecar.stats.SidecarStats;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.DigestVerifier;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.MetadataUtils;
 import org.apache.cassandra.sidecar.utils.SSTableUploader;
@@ -62,6 +64,7 @@ public class SSTableUploadHandler extends 
AbstractHandler<SSTableUploadRequest>
     private final SSTableUploadsPathBuilder uploadPathBuilder;
     private final ConcurrencyLimiter limiter;
     private final SSTableStats stats;
+    private final DigestVerifierFactory digestVerifierFactory;
 
     /**
      * Constructs a handler with the provided params.
@@ -74,6 +77,7 @@ public class SSTableUploadHandler extends 
AbstractHandler<SSTableUploadRequest>
      * @param executorPools        executor pools for blocking executions
      * @param validator            a validator instance to validate 
Cassandra-specific input
      * @param sidecarStats         an interface holding all stats related to 
main sidecar process
+     * @param digestVerifierFactory a factory of checksum verifiers
      */
     @Inject
     protected SSTableUploadHandler(Vertx vertx,
@@ -83,7 +87,8 @@ public class SSTableUploadHandler extends 
AbstractHandler<SSTableUploadRequest>
                                    SSTableUploadsPathBuilder uploadPathBuilder,
                                    ExecutorPools executorPools,
                                    CassandraInputValidator validator,
-                                   SidecarStats sidecarStats)
+                                   SidecarStats sidecarStats,
+                                   DigestVerifierFactory digestVerifierFactory)
     {
         super(metadataFetcher, executorPools, validator);
         this.fs = vertx.fileSystem();
@@ -92,6 +97,7 @@ public class SSTableUploadHandler extends 
AbstractHandler<SSTableUploadRequest>
         this.uploadPathBuilder = uploadPathBuilder;
         this.limiter = new 
ConcurrencyLimiter(configuration::concurrentUploadsLimit);
         this.stats = sidecarStats.ssTableStats();
+        this.digestVerifierFactory = digestVerifierFactory;
     }
 
     /**
@@ -124,11 +130,14 @@ public class SSTableUploadHandler extends 
AbstractHandler<SSTableUploadRequest>
         .compose(validRequest -> 
uploadPathBuilder.resolveStagingDirectory(host))
         .compose(this::ensureSufficientSpaceAvailable)
         .compose(v -> uploadPathBuilder.build(host, request))
-        .compose(uploadDirectory -> uploader.uploadComponent(httpRequest,
-                                                             uploadDirectory,
-                                                             
request.component(),
-                                                             
request.expectedChecksum(),
-                                                             
configuration.filePermissions()))
+        .compose(uploadDirectory -> {
+            DigestVerifier digestVerifier = 
digestVerifierFactory.verifier(httpRequest.headers());
+            return uploader.uploadComponent(httpRequest,
+                                            uploadDirectory,
+                                            request.component(),
+                                            digestVerifier,
+                                            configuration.filePermissions());
+        })
         .compose(fs::props)
         .onSuccess(fileProps -> {
             long serviceTimeMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java 
b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index b1af4b7..31ab7b1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -96,8 +96,6 @@ import org.apache.cassandra.sidecar.stats.RestoreJobStats;
 import org.apache.cassandra.sidecar.stats.SidecarSchemaStats;
 import org.apache.cassandra.sidecar.stats.SidecarStats;
 import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
-import org.apache.cassandra.sidecar.utils.ChecksumVerifier;
-import org.apache.cassandra.sidecar.utils.MD5ChecksumVerifier;
 import org.apache.cassandra.sidecar.utils.TimeProvider;
 
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
@@ -428,13 +426,6 @@ public class MainModule extends AbstractModule
         return DnsResolver.DEFAULT;
     }
 
-    @Provides
-    @Singleton
-    public ChecksumVerifier checksumVerifier(Vertx vertx)
-    {
-        return new MD5ChecksumVerifier(vertx.fileSystem());
-    }
-
     @Provides
     @Singleton
     public SidecarVersionProvider sidecarVersionProvider()
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
new file mode 100644
index 0000000..790b54c
--- /dev/null
+++ 
b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.Digest;
+
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
+
+/**
+ * Provides basic functionality to perform digest validations using {@link 
AsyncFile}
+ *
+ * @param <D> the Digest type
+ */
+public abstract class AsyncFileDigestVerifier<D extends Digest> implements 
DigestVerifier
+{
+    public static final int DEFAULT_READ_BUFFER_SIZE = 512 * 1024; // 512KiB
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+    protected final FileSystem fs;
+    protected final D digest;
+
+    protected AsyncFileDigestVerifier(FileSystem fs, D digest)
+    {
+        this.fs = fs;
+        this.digest = Objects.requireNonNull(digest, "digest is required");
+    }
+
+    /**
+     * @param filePath path to SSTable component
+     * @return a future String with the component path if verification is a 
success, otherwise a failed future
+     */
+    @Override
+    public Future<String> verify(String filePath)
+    {
+        logger.debug("Validating {}. expected_digest={}", digest.algorithm(), 
digest.value());
+
+        return fs.open(filePath, new OpenOptions())
+                 .compose(this::calculateDigest)
+                 .compose(computedDigest -> {
+                     if (!computedDigest.equals(digest.value()))
+                     {
+                         logger.error("Digest mismatch. computed_digest={}, 
expected_digest={}, algorithm=MD5",
+                                      computedDigest, digest.value());
+                         return Future.failedFuture(new 
HttpException(CHECKSUM_MISMATCH.code(),
+                                                                      
String.format("Digest mismatch. "
+                                                                               
     + "expected_digest=%s, "
+                                                                               
     + "algorithm=%s",
+                                                                               
     digest.value(),
+                                                                               
     digest.algorithm())));
+                     }
+                     return Future.succeededFuture(filePath);
+                 });
+    }
+
+    /**
+     * Returns a future with the calculated digest for the provided {@link 
AsyncFile file}.
+     *
+     * @param asyncFile the async file to use for digest calculation
+     * @return a future with the computed digest for the provided {@link 
AsyncFile file}
+     */
+    protected abstract Future<String> calculateDigest(AsyncFile asyncFile);
+
+    protected void readFile(AsyncFile file, Promise<String> result, 
Handler<Buffer> onBufferAvailable,
+                            Handler<Void> onReadComplete)
+    {
+        // Make sure to close the file when complete
+        result.future().onComplete(ignored -> file.end());
+        file.pause()
+            .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
+            .handler(onBufferAvailable)
+            .endHandler(onReadComplete)
+            .exceptionHandler(cause -> {
+                logger.error("Error while calculating the {} digest", 
digest.algorithm(), cause);
+                result.fail(cause);
+            })
+            .resume();
+    }
+}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
similarity index 64%
rename from 
src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
rename to src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
index e338642..07bb741 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
@@ -23,16 +23,15 @@ import io.vertx.core.Future;
 /**
  * Interface to verify integrity of SSTables uploaded.
  * <p>
- * Note: If checksum calculations of multiple files are happening at the same 
time, we would want to limit concurrent
- * checksum calculations. Since {@link ChecksumVerifier} is currently used 
only by upload handler, we are not
- * introducing another limit here. Concurrent uploads limit should limit 
concurrent checksum calculations as well.
+ * Note: If digest calculations of multiple files are happening at the same 
time, we would want to limit concurrent
+ * digest calculations. Since {@link DigestVerifier} is currently used only by 
upload handler, we are not
+ * introducing another limit here. Concurrent uploads limit should limit 
concurrent digest calculations as well.
  */
-public interface ChecksumVerifier
+public interface DigestVerifier
 {
     /**
-     * @param checksum  expected checksum value
-     * @param filePath  path to SSTable component
-     * @return String   component path, if verification is a success, else a 
failed future is returned
+     * @param filePath path to SSTable component
+     * @return a future String with the component path if verification is a 
success, otherwise a failed future
      */
-    Future<String> verify(String checksum, String filePath);
+    Future<String> verify(String filePath);
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
new file mode 100644
index 0000000..3dd64a2
--- /dev/null
+++ 
b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.FileSystem;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+
+/**
+ * A factory class that returns the {@link DigestVerifier} instance.
+ */
+@Singleton
+public class DigestVerifierFactory
+{
+    @VisibleForTesting
+    static final DigestVerifier FALLBACK_VERIFIER = Future::succeededFuture;
+    private final FileSystem fs;
+
+
+    /**
+     * Constructs a new factory
+     *
+     * @param vertx the vertx instance
+     */
+    @Inject
+    public DigestVerifierFactory(Vertx vertx)
+    {
+        this.fs = vertx.fileSystem();
+    }
+
+    /***
+     * Returns the first match for a {@link DigestVerifier} from the 
registered list of verifiers. If none of the
+     * verifiers matches, a no-op validator is returned.
+     *
+     * @param headers the request headers used to test whether a {@link 
DigestVerifier} can be used to verify the
+     *                request
+     * @return the first match for a {@link DigestVerifier} from the 
registered list of verifiers, or a no-op
+     * verifier if none match
+     */
+    public DigestVerifier verifier(MultiMap headers)
+    {
+        if (headers.contains(CONTENT_XXHASH32))
+        {
+            return XXHash32DigestVerifier.create(fs, headers);
+        }
+        else if (headers.contains(HttpHeaderNames.CONTENT_MD5.toString()))
+        {
+            return MD5DigestVerifier.create(fs, headers);
+        }
+        // Fallback to no-op validator
+        return FALLBACK_VERIFIER;
+    }
+}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
deleted file mode 100644
index 132ff10..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.utils;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Base64;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.vertx.core.Future;
-import io.vertx.core.Promise;
-import io.vertx.core.file.AsyncFile;
-import io.vertx.core.file.FileSystem;
-import io.vertx.core.file.OpenOptions;
-import io.vertx.ext.web.handler.HttpException;
-import org.jetbrains.annotations.VisibleForTesting;
-
-import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
-
-/**
- * Implementation of {@link ChecksumVerifier}. Here we use MD5 implementation 
of {@link java.security.MessageDigest}
- * for calculating checksum. And match the calculated checksum with expected 
checksum obtained from request.
- */
-public class MD5ChecksumVerifier implements ChecksumVerifier
-{
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MD5ChecksumVerifier.class);
-    public static final int DEFAULT_READ_BUFFER_SIZE = 64 * 1024; // 64KiB
-    private final FileSystem fs;
-
-    public MD5ChecksumVerifier(FileSystem fs)
-    {
-        this.fs = fs;
-    }
-
-    public Future<String> verify(String expectedChecksum, String filePath)
-    {
-        if (expectedChecksum == null)
-        {
-            return Future.succeededFuture(filePath);
-        }
-
-        LOGGER.debug("Validating MD5. expected_checksum={}", expectedChecksum);
-
-        return fs.open(filePath, new OpenOptions())
-                 .compose(this::calculateMD5)
-                 .compose(computedChecksum -> {
-                     if (!expectedChecksum.equals(computedChecksum))
-                     {
-                         LOGGER.error("Checksum mismatch. 
computed_checksum={}, expected_checksum={}, algorithm=MD5",
-                                      computedChecksum, expectedChecksum);
-                         return Future.failedFuture(new 
HttpException(CHECKSUM_MISMATCH.code(),
-                                                                      
String.format("Checksum mismatch. "
-                                                                               
     + "expected_checksum=%s, "
-                                                                               
     + "algorithm=MD5",
-                                                                               
     expectedChecksum)));
-                     }
-                     return Future.succeededFuture(filePath);
-                 });
-    }
-
-    @VisibleForTesting
-    Future<String> calculateMD5(AsyncFile file)
-    {
-        MessageDigest digest;
-        try
-        {
-            digest = MessageDigest.getInstance("MD5");
-        }
-        catch (NoSuchAlgorithmException e)
-        {
-            return Future.failedFuture(e);
-        }
-
-        Promise<String> result = Promise.promise();
-        file.pause()
-            .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
-            .handler(buf -> digest.update(buf.getBytes()))
-            .endHandler(_v -> {
-                
result.complete(Base64.getEncoder().encodeToString(digest.digest()));
-                file.end();
-            })
-            .exceptionHandler(cause -> {
-                LOGGER.error("Error while calculating MD5 checksum", cause);
-                result.fail(cause);
-                file.end();
-            })
-            .resume();
-        return result.future();
-    }
-}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
new file mode 100644
index 0000000..ed4218d
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Implementation of {@link DigestVerifier}. Here we use MD5 implementation of 
{@link java.security.MessageDigest}
+ * for calculating digest and match the calculated digest with expected digest 
obtained from request.
+ */
+public class MD5DigestVerifier extends AsyncFileDigestVerifier<MD5Digest>
+{
+    protected MD5DigestVerifier(FileSystem fs, MD5Digest digest)
+    {
+        super(fs, digest);
+    }
+
+    public static DigestVerifier create(FileSystem fs, MultiMap headers)
+    {
+        MD5Digest md5Digest = new 
MD5Digest(headers.get(HttpHeaderNames.CONTENT_MD5.toString()));
+        return new MD5DigestVerifier(fs, md5Digest);
+    }
+
+    @Override
+    @VisibleForTesting
+    protected Future<String> calculateDigest(AsyncFile file)
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("MD5");
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            return Future.failedFuture(e);
+        }
+
+        Promise<String> result = Promise.promise();
+
+        readFile(file, result, buf -> digest.update(buf.getBytes()),
+                 _v -> 
result.complete(Base64.getEncoder().encodeToString(digest.digest())));
+
+        return result.future();
+    }
+}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
index 465f0a4..e7f59a4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
@@ -39,6 +39,7 @@ import io.vertx.core.file.FileSystem;
 import io.vertx.core.file.OpenOptions;
 import io.vertx.core.streams.ReadStream;
 import io.vertx.core.streams.WriteStream;
+import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
 
 /**
  * A class that handles SSTable Uploads
@@ -50,23 +51,19 @@ public class SSTableUploader
     private static final String DEFAULT_TEMP_SUFFIX = ".tmp";
 
     private final FileSystem fs;
-    private final ChecksumVerifier checksumVerifier;
     private final SidecarRateLimiter rateLimiter;
 
     /**
      * Constructs an instance of {@link SSTableUploader} with provided params 
for uploading an SSTable component.
      *
-     * @param vertx            Vertx reference
-     * @param checksumVerifier verifier for checking integrity of upload
-     * @param rateLimiter      rate limiter for uploading SSTable components
+     * @param vertx       Vertx reference
+     * @param rateLimiter rate limiter for uploading SSTable components
      */
     @Inject
     public SSTableUploader(Vertx vertx,
-                           ChecksumVerifier checksumVerifier,
                            @Named("IngressFileRateLimiter") SidecarRateLimiter 
rateLimiter)
     {
         this.fs = vertx.fileSystem();
-        this.checksumVerifier = checksumVerifier;
         this.rateLimiter = rateLimiter;
     }
 
@@ -76,14 +73,14 @@ public class SSTableUploader
      * @param readStream        server request from which file upload is 
acquired
      * @param uploadDirectory   the absolute path to the upload directory in 
the target {@code fs}
      * @param componentFileName the file name of the component
-     * @param expectedChecksum  for verifying upload integrity, passed in 
through request
+     * @param digestVerifier    the digest verifier instance
      * @param filePermissions   specifies the posix file permissions used to 
create the SSTable file
      * @return path of SSTable component to which data was uploaded
      */
     public Future<String> uploadComponent(ReadStream<Buffer> readStream,
                                           String uploadDirectory,
                                           String componentFileName,
-                                          String expectedChecksum,
+                                          DigestVerifier digestVerifier,
                                           String filePermissions)
     {
 
@@ -92,15 +89,16 @@ public class SSTableUploader
 
         return fs.mkdirs(uploadDirectory) // ensure the parent directory is 
created
                  .compose(v -> createTempFile(uploadDirectory, 
componentFileName, filePermissions))
-                 .compose(tempFilePath -> streamAndVerify(readStream, 
tempFilePath, expectedChecksum))
+                 .compose(tempFilePath -> streamAndVerify(readStream, 
tempFilePath, digestVerifier))
                  .compose(verifiedTempFilePath -> 
moveAtomicallyWithFallBack(verifiedTempFilePath, targetPath));
     }
 
-    private Future<String> streamAndVerify(ReadStream<Buffer> readStream, 
String tempFilePath, String expectedChecksum)
+    private Future<String> streamAndVerify(ReadStream<Buffer> readStream, 
String tempFilePath,
+                                           DigestVerifier digestVerifier)
     {
         // pipe read stream to temp file
         return streamToFile(readStream, tempFilePath)
-               .compose(v -> checksumVerifier.verify(expectedChecksum, 
tempFilePath))
+               .compose(v -> digestVerifier.verify(tempFilePath))
                .onFailure(throwable -> fs.delete(tempFilePath));
     }
 
@@ -128,7 +126,9 @@ public class SSTableUploader
         LOGGER.debug("Moving from={} to={}", source, target);
         return fs.move(source, target, new CopyOptions().setAtomicMove(true))
                  .recover(cause -> {
-                     if (hasCause(cause, 
AtomicMoveNotSupportedException.class, 10))
+                     Exception atomicMoveNotSupportedException =
+                     ThrowableUtils.getCause(cause, 
AtomicMoveNotSupportedException.class);
+                     if (atomicMoveNotSupportedException != null)
                      {
                          LOGGER.warn("Failed to perform atomic move from={} 
to={}", source, target, cause);
                          return fs.move(source, target, new 
CopyOptions().setAtomicMove(false));
@@ -138,33 +138,6 @@ public class SSTableUploader
                  .compose(v -> Future.succeededFuture(target));
     }
 
-    /**
-     * Returns true if a cause of type {@code type} is found in the stack 
trace before exceeding the {@code depth}
-     *
-     * @param cause the original cause
-     * @param type  the exception type to test
-     * @param depth the maximum depth to check in the stack trace
-     * @return true if the exception of type {@code type} exists in the 
stacktrace, false otherwise
-     */
-    private static boolean hasCause(Throwable cause, Class<? extends 
Throwable> type, int depth)
-    {
-        int i = 0;
-        while (i < depth)
-        {
-            if (cause == null)
-                return false;
-
-            if (type.isInstance(cause))
-                return true;
-
-            cause = cause.getCause();
-
-            i++;
-        }
-        return false;
-    }
-
-
     /**
      * A {@link WriteStream} implementation that supports rate limiting.
      */
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
new file mode 100644
index 0000000..b1944c6
--- /dev/null
+++ 
b/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import net.jpountz.xxhash.StreamingXXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
+
+/**
+ * Implementation of {@link DigestVerifier} to calculate the digest and match 
the calculated digest
+ * with the expected digest.
+ */
+public class XXHash32DigestVerifier extends 
AsyncFileDigestVerifier<XXHash32Digest>
+{
+    protected XXHash32DigestVerifier(FileSystem fs, XXHash32Digest digest)
+    {
+        super(fs, digest);
+    }
+
+    public static XXHash32DigestVerifier create(FileSystem fs, MultiMap 
headers)
+    {
+        XXHash32Digest digest = new 
XXHash32Digest(headers.get(CONTENT_XXHASH32), 
headers.get(CONTENT_XXHASH32_SEED));
+        return new XXHash32DigestVerifier(fs, digest);
+    }
+
+    @Override
+    @VisibleForTesting
+    protected Future<String> calculateDigest(AsyncFile file)
+    {
+        Promise<String> result = Promise.promise();
+        Future<String> future = result.future();
+
+        // might have shared hashers with ThreadLocal
+        XXHashFactory factory = XXHashFactory.safeInstance();
+
+        int seed = maybeGetSeedOrDefault();
+        StreamingXXHash32 hasher = factory.newStreamingHash32(seed);
+
+        future.onComplete(ignored -> hasher.close());
+
+        readFile(file, result, buf -> {
+                     byte[] bytes = buf.getBytes();
+                     hasher.update(bytes, 0, bytes.length);
+                 },
+                 _v -> result.complete(Long.toHexString(hasher.getValue())));
+
+        return future;
+    }
+
+    protected int maybeGetSeedOrDefault()
+    {
+        String seedHex = digest.seedHex();
+        if (seedHex != null)
+        {
+            return (int) Long.parseLong(seedHex, 16);
+        }
+        return 0x9747b28c; // random seed for initializing
+    }
+}
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
index 49554c6..2694fb4 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.sidecar.routes.sstableuploads;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -27,7 +26,6 @@ import java.nio.file.attribute.PosixFilePermission;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
@@ -45,6 +43,9 @@ import io.vertx.ext.web.client.HttpResponse;
 import io.vertx.ext.web.client.WebClient;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.Digest;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
 import org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus;
 import org.apache.cassandra.sidecar.snapshots.SnapshotUtils;
 import org.assertj.core.data.Percentage;
@@ -58,6 +59,7 @@ import static 
java.nio.file.attribute.PosixFilePermission.OTHERS_WRITE;
 import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE;
 import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
 import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static org.apache.cassandra.sidecar.utils.TestFileUtils.prepareTestFile;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.when;
 
@@ -75,7 +77,7 @@ class SSTableUploadHandlerTest extends BaseUploadsHandlerTest
     void testUploadWithoutMd5_expectSuccessfulUpload(VertxTestContext context) 
throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-md5.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.OK.code(), false);
     }
 
@@ -83,15 +85,63 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
     void testUploadWithCorrectMd5_expectSuccessfulUpload(VertxTestContext 
context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-correct-md5.db", "jXd/OF09/siBXSD3SWAm3A==",
-                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.OK.code(), false);
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-correct-md5.db",
+                                   new MD5Digest("jXd/OF09/siBXSD3SWAm3A=="),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   HttpResponseStatus.OK.code(),
+                                   false);
+    }
+
+    @Test
+    void testUploadWithCorrectXXHash_expectSuccessfulUpload(VertxTestContext 
context) throws IOException
+    {
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-correct-xxhash.db",
+                                   new XXHash32Digest("7a28edc0"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   HttpResponseStatus.OK.code(),
+                                   false);
+    }
+
+    @Test
+    void 
testUploadWithCorrectXXHashAndCustomSeed_expectSuccessfulUpload(VertxTestContext
 context) throws IOException
+    {
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-correct-xxhash.db",
+                                   new XXHash32Digest("ffffffffb9510d6b", 
"55555555"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   HttpResponseStatus.OK.code(),
+                                   false);
     }
 
     @Test
     void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context) 
throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-incorrect-md5.db", "incorrectMd5",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-incorrect-md5.db",
+                                   new MD5Digest("incorrectMd5"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   
SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
+                                   false);
+    }
+
+    @Test
+    void testUploadWithIncorrectXXHash_expectErrorCode(VertxTestContext 
context) throws IOException
+    {
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-incorrect-xxhash.db",
+                                   new XXHash32Digest("incorrectXXHash"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   
SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
+                                   false);
+    }
+
+    @Test
+    void 
testUploadWithIncorrectXXHashAndCustomSeed_expectErrorCode(VertxTestContext 
context) throws IOException
+    {
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-incorrect-xxhash.db",
+                                   new XXHash32Digest("7a28edc0", "bad"),
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    
SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
                                    false);
@@ -101,26 +151,27 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
     void testInvalidFileName_expectErrorCode(VertxTestContext context) throws 
IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"ks$tbl-me-4-big-Data.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"ks$tbl-me-4-big-Data.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
     }
 
     @Test
-    void 
testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context) 
throws IOException
+    void 
testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context)
     {
         UUID uploadId = UUID.randomUUID();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-content-length.db",
-                                   "jXd/OF09/siBXSD3SWAm3A==", 0, 
HttpResponseStatus.OK.code(), false);
+                                   new MD5Digest("jXd/OF09/siBXSD3SWAm3A=="), 
0, HttpResponseStatus.OK.code(), false);
     }
 
     @Test
-    void testUploadTimeout_expectTimeoutError(VertxTestContext context) throws 
IOException
+    void testUploadTimeout_expectTimeoutError(VertxTestContext context)
     {
         // if we send more than actual length, vertx goes hung, probably 
looking for more data than exists in the file,
         // we should see timeout error in this case
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-higher-content-length.db", "", 1000, -1, true);
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-higher-content-length.db", null, 1000, -1,
+                                   true);
     }
 
     @Test
@@ -128,14 +179,14 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
     {
         UUID uploadId = UUID.randomUUID();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-lesser-content-length.db",
-                                   "", 
Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
+                                   null, 
Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
                                    false);
     }
 
     @Test
     void testInvalidUploadId(VertxTestContext context) throws IOException
     {
-        sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl", 
"with-lesser-content-length.db", "",
+        sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl", 
"with-lesser-content-length.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.BAD_REQUEST.code(),
                                    false, response -> {
             JsonObject error = response.bodyAsJsonObject();
@@ -149,7 +200,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
     void testInvalidKeyspace(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace", 
"tbl", "with-lesser-content-length.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace", 
"tbl", "with-lesser-content-length.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
     }
@@ -158,7 +209,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
     void testInvalidTable(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", 
"invalidTableName", "with-lesser-content-length.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", 
"invalidTableName", "with-lesser-content-length.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
     }
@@ -169,7 +220,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
         
when(mockSSTableUploadConfiguration.minimumSpacePercentageRequired()).thenReturn(100F);
 
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-md5.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    
HttpResponseStatus.INSUFFICIENT_STORAGE.code(), false);
     }
@@ -180,7 +231,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
         
when(mockSSTableUploadConfiguration.concurrentUploadsLimit()).thenReturn(0);
 
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-md5.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    
HttpResponseStatus.TOO_MANY_REQUESTS.code(), false);
     }
@@ -193,13 +244,13 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
         UUID uploadId = UUID.randomUUID();
         CountDownLatch latch = new CountDownLatch(1);
         sendUploadRequestAndVerify(latch, context, uploadId.toString(), 
"invalidKeyspace", "tbl",
-                                   "without-md5.db", "", 
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   "without-md5.db", null, 
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    HttpResponseStatus.BAD_REQUEST.code(), 
false);
 
         assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
 
         // checking if permits were released after bad requests
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-md5.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.OK.code(), false);
     }
 
@@ -209,7 +260,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
         String uploadId = UUID.randomUUID().toString();
         
when(mockSSTableUploadConfiguration.filePermissions()).thenReturn("rwxr-xr-x");
 
-        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", 
"without-md5.db", "",
+        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", 
"without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.OK.code(),
                                    false, response -> {
 
@@ -247,7 +298,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
 
         long startTime = System.nanoTime();
         String uploadId = UUID.randomUUID().toString();
-        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", 
"1MB-File-Data.db", "",
+        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", 
"1MB-File-Data.db", null,
                                    Files.size(largeFilePath), 
HttpResponseStatus.OK.code(),
                                    false, response -> {
 
@@ -269,7 +320,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
 
         long startTime = System.nanoTime();
         String uploadId = UUID.randomUUID().toString();
-        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", 
"1MB-File-Data.db", "",
+        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", 
"1MB-File-Data.db", null,
                                    Files.size(largeFilePath), 
HttpResponseStatus.OK.code(),
                                    false, response -> {
 
@@ -286,13 +337,13 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                                             String keyspace,
                                             String tableName,
                                             String targetFileName,
-                                            String expectedMd5,
+                                            Digest expectedDigest,
                                             long fileLength,
                                             int expectedRetCode,
                                             boolean expectTimeout)
     {
         sendUploadRequestAndVerify(null, context, uploadId.toString(), 
keyspace, tableName, targetFileName,
-                                   expectedMd5, fileLength, expectedRetCode, 
expectTimeout);
+                                   expectedDigest, fileLength, 
expectedRetCode, expectTimeout);
     }
 
     private void sendUploadRequestAndVerify(CountDownLatch latch,
@@ -301,7 +352,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                                             String keyspace,
                                             String tableName,
                                             String targetFileName,
-                                            String expectedMd5,
+                                            Digest expectedDigest,
                                             long fileLength,
                                             int expectedRetCode,
                                             boolean expectTimeout)
@@ -312,7 +363,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                                    keyspace,
                                    tableName,
                                    targetFileName,
-                                   expectedMd5,
+                                   expectedDigest,
                                    fileLength,
                                    expectedRetCode,
                                    expectTimeout,
@@ -326,7 +377,7 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                                             String keyspace,
                                             String tableName,
                                             String targetFileName,
-                                            String expectedMd5,
+                                            Digest expectedDigest,
                                             long fileLength,
                                             int expectedRetCode,
                                             boolean expectTimeout,
@@ -337,9 +388,9 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
         String testRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" + 
keyspace
                            + "/tables/" + tableName + "/components/" + 
targetFileName;
         HttpRequest<Buffer> req = client.put(server.actualPort(), "localhost", 
testRoute);
-        if (!expectedMd5.isEmpty())
+        if (expectedDigest != null)
         {
-            req.putHeader(HttpHeaderNames.CONTENT_MD5.toString(), expectedMd5);
+            req.headers().addAll(expectedDigest.headers());
         }
         if (fileLength != 0)
         {
@@ -383,24 +434,4 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
             client.close();
         });
     }
-
-    static Path prepareTestFile(Path directory, String fileName, long 
sizeInBytes) throws IOException
-    {
-        Path filePath = directory.resolve(fileName);
-        Files.deleteIfExists(filePath);
-
-        byte[] buffer = new byte[1024];
-        try (OutputStream outputStream = Files.newOutputStream(filePath))
-        {
-            int written = 0;
-            while (written < sizeInBytes)
-            {
-                ThreadLocalRandom.current().nextBytes(buffer);
-                int toWrite = (int) Math.min(buffer.length, sizeInBytes - 
written);
-                outputStream.write(buffer, 0, toWrite);
-                written += toWrite;
-            }
-        }
-        return filePath;
-    }
 }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
new file mode 100644
index 0000000..810e015
--- /dev/null
+++ 
b/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.impl.headers.HeadersMultiMap;
+
+import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static 
org.apache.cassandra.sidecar.utils.DigestVerifierFactory.FALLBACK_VERIFIER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link DigestVerifierFactory}
+ */
+class DigestVerifierFactoryTest
+{
+    MultiMap options;
+    Vertx vertx = Vertx.vertx();
+
+    @BeforeEach
+    void setup()
+    {
+        options = new HeadersMultiMap();
+    }
+
+
+    @Test
+    void testEmptyFactoryReturnsFallbackVerifier()
+    {
+        DigestVerifier verifier = new 
DigestVerifierFactory(vertx).verifier(options);
+        assertThat(verifier).as("should fallback to the fallback verifier when 
no verifiers are configured")
+                            .isNotNull()
+                            .isSameAs(FALLBACK_VERIFIER);
+    }
+
+    @Test
+    void testMd5Verifier()
+    {
+        options.set("content-md5", "md5-header");
+        DigestVerifier verifier = new 
DigestVerifierFactory(vertx).verifier(options);
+
+        assertThat(verifier).as("MD5DigestVerifier can verify MD5 content 
headers")
+                            .isNotNull()
+                            .isInstanceOf(MD5DigestVerifier.class);
+    }
+
+    @Test
+    void testXXHashVerifier()
+    {
+        options.set(CONTENT_XXHASH32, "xxhash-header");
+        DigestVerifier verifier = new 
DigestVerifierFactory(vertx).verifier(options);
+
+        assertThat(verifier).as("XXHashDigestVerifier can verify XXHash 
content headers")
+                            .isNotNull()
+                            .isInstanceOf(XXHash32DigestVerifier.class);
+    }
+
+    @Test
+    void testFirstVerifierTakesPrecedence()
+    {
+        options.set("content-md5", "md5-header")
+               .set(CONTENT_XXHASH32, "xxhash-header");
+        DigestVerifier verifier = new 
DigestVerifierFactory(vertx).verifier(options);
+
+        assertThat(verifier).as("XXHashDigestVerifier is selected when both 
headers are present")
+                            .isNotNull()
+                            .isInstanceOf(XXHash32DigestVerifier.class);
+    }
+}
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java 
b/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
similarity index 55%
rename from 
src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
rename to 
src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
index 8eb5680..550ef8e 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
@@ -19,13 +19,12 @@
 package org.apache.cassandra.sidecar.utils;
 
 import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Base64;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.BeforeAll;
@@ -36,17 +35,17 @@ import io.vertx.core.Future;
 import io.vertx.core.Vertx;
 import io.vertx.core.file.AsyncFile;
 import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
- * Unit tests for {@link MD5ChecksumVerifier}
+ * Unit tests for {@link MD5DigestVerifier}
  */
-class MD5ChecksumVerifierTest
+class MD5DigestVerifierTest
 {
     static Vertx vertx;
-    static ExposeAsyncFileMD5ChecksumVerifier verifier;
 
     @TempDir
     Path tempDir;
@@ -55,33 +54,33 @@ class MD5ChecksumVerifierTest
     static void setup()
     {
         vertx = Vertx.vertx();
-        verifier = new ExposeAsyncFileMD5ChecksumVerifier(vertx.fileSystem());
     }
 
     @Test
-    void testFileDescriptorsClosedWithValidChecksum() throws IOException, 
NoSuchAlgorithmException,
-                                                             
InterruptedException
+    void testFileDescriptorsClosedWithValidDigest() throws IOException, 
NoSuchAlgorithmException,
+                                                           InterruptedException
     {
-        byte[] randomBytes = generateRandomBytes();
-        Path randomFilePath = writeBytesToRandomFile(randomBytes);
-        String expectedChecksum = Base64.getEncoder()
-                                        
.encodeToString(MessageDigest.getInstance("MD5")
-                                                                     
.digest(randomBytes));
+        Path randomFilePath = TestFileUtils.prepareTestFile(tempDir, 
"random-file.txt", 1024);
+        byte[] randomBytes = Files.readAllBytes(randomFilePath);
+        String expectedDigest = Base64.getEncoder()
+                                      
.encodeToString(MessageDigest.getInstance("MD5")
+                                                                   
.digest(randomBytes));
 
-        runTestScenario(randomFilePath, expectedChecksum);
+        runTestScenario(randomFilePath, expectedDigest);
     }
 
     @Test
-    void testFileDescriptorsClosedWithInvalidChecksum() throws IOException, 
InterruptedException
+    void testFileDescriptorsClosedWithInvalidDigest() throws IOException, 
InterruptedException
     {
-        Path randomFilePath = writeBytesToRandomFile(generateRandomBytes());
+        Path randomFilePath = TestFileUtils.prepareTestFile(tempDir, 
"random-file.txt", 1024);
         runTestScenario(randomFilePath, "invalid");
     }
 
-    private void runTestScenario(Path filePath, String checksum) throws 
InterruptedException
+    private void runTestScenario(Path filePath, String digest) throws 
InterruptedException
     {
         CountDownLatch latch = new CountDownLatch(1);
-        verifier.verify(checksum, filePath.toAbsolutePath().toString())
+        ExposeAsyncFileMD5DigestVerifier verifier = newVerifier(new 
MD5Digest(digest));
+        verifier.verify(filePath.toAbsolutePath().toString())
                 .onComplete(complete -> latch.countDown());
 
         assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
@@ -93,41 +92,29 @@ class MD5ChecksumVerifierTest
         .hasMessageContaining("File handle is closed");
     }
 
-    private byte[] generateRandomBytes()
+    static ExposeAsyncFileMD5DigestVerifier newVerifier(MD5Digest digest)
     {
-        byte[] bytes = new byte[1024];
-        ThreadLocalRandom.current().nextBytes(bytes);
-        return bytes;
-    }
-
-    private Path writeBytesToRandomFile(byte[] bytes) throws IOException
-    {
-        Path tempPath = tempDir.resolve("random-file.txt");
-        try (RandomAccessFile writer = new RandomAccessFile(tempPath.toFile(), 
"rw"))
-        {
-            writer.write(bytes);
-        }
-        return tempPath;
+        return new ExposeAsyncFileMD5DigestVerifier(vertx.fileSystem(), 
digest);
     }
 
     /**
-     * Class that extends from {@link MD5ChecksumVerifier} for testing 
purposes and holds a reference to the
+     * Class that extends from {@link MD5DigestVerifier} for testing purposes 
and holds a reference to the
      * {@link AsyncFile} to ensure that the file has been closed.
      */
-    static class ExposeAsyncFileMD5ChecksumVerifier extends MD5ChecksumVerifier
+    static class ExposeAsyncFileMD5DigestVerifier extends MD5DigestVerifier
     {
         AsyncFile file;
 
-        public ExposeAsyncFileMD5ChecksumVerifier(FileSystem fs)
+        public ExposeAsyncFileMD5DigestVerifier(FileSystem fs, MD5Digest 
md5Digest)
         {
-            super(fs);
+            super(fs, md5Digest);
         }
 
         @Override
-        Future<String> calculateMD5(AsyncFile file)
+        protected Future<String> calculateDigest(AsyncFile file)
         {
             this.file = file;
-            return super.calculateMD5(file);
+            return super.calculateDigest(file);
         }
     }
 }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java 
b/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
new file mode 100644
index 0000000..195ef34
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * File utilities for tests
+ */
+public class TestFileUtils
+{
+    /**
+     * Writes random data to a file with name {@code filename} under the 
specified {@code directory} with
+     * the specified size in bytes.
+     *
+     * @param directory   the directory where to
+     * @param fileName    the name of the desired file to create
+     * @param sizeInBytes the size of the files in bytes
+     * @return the path of the file that was recently created
+     * @throws IOException when file creation or writing to the file fails
+     */
+    public static Path prepareTestFile(Path directory, String fileName, long 
sizeInBytes) throws IOException
+    {
+        Path filePath = directory.resolve(fileName);
+        Files.deleteIfExists(filePath);
+
+        byte[] buffer = new byte[1024];
+        try (OutputStream outputStream = Files.newOutputStream(filePath))
+        {
+            int written = 0;
+            while (written < sizeInBytes)
+            {
+                ThreadLocalRandom.current().nextBytes(buffer);
+                int toWrite = (int) Math.min(buffer.length, sizeInBytes - 
written);
+                outputStream.write(buffer, 0, toWrite);
+                written += toWrite;
+            }
+        }
+        return filePath;
+    }
+}
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
new file mode 100644
index 0000000..ff7566e
--- /dev/null
+++ 
b/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
+import org.assertj.core.api.InstanceOfAssertFactories;
+
+import static org.apache.cassandra.sidecar.restore.RestoreJobUtil.checksum;
+import static org.assertj.core.api.Assertions.as;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.from;
+
+/**
+ * Unit tests for {@link XXHash32DigestVerifier}
+ */
+class XXHash32DigestVerifierTest
+{
+    static Vertx vertx;
+
+    @TempDir
+    static Path tempDir;
+    static Path randomFilePath;
+
+    @BeforeAll
+    static void setup() throws IOException
+    {
+        vertx = Vertx.vertx();
+
+        randomFilePath = TestFileUtils.prepareTestFile(tempDir, 
"random-file.txt", 1024);
+    }
+
+    @Test
+    void failsWhenDigestIsNull()
+    {
+        assertThatNullPointerException().isThrownBy(() -> 
newVerifier(null)).withMessage("digest is required");
+    }
+
+    @Test
+    void testFileDescriptorsClosedWithValidDigest() throws IOException, 
InterruptedException
+    {
+        XXHash32Digest digest = new 
XXHash32Digest(checksum(randomFilePath.toFile()));
+        runTestScenario(randomFilePath, digest, false);
+    }
+
+    @Test
+    void failsWithNonDefaultSeedAndSeedIsNotPassedAsAnOption() throws 
IOException, InterruptedException
+    {
+        XXHash32Digest digest = new 
XXHash32Digest(checksum(randomFilePath.toFile(), 0x55555555));
+        runTestScenario(randomFilePath, digest, true);
+    }
+
+    @Test
+    void testWithCustomSeed() throws IOException, InterruptedException
+    {
+        int seed = 0x55555555;
+        XXHash32Digest digest = new 
XXHash32Digest(checksum(randomFilePath.toFile(), seed), seed);
+        runTestScenario(randomFilePath, digest, false);
+    }
+
+    @Test
+    void testFileDescriptorsClosedWithInvalidDigest() throws 
InterruptedException
+    {
+        runTestScenario(randomFilePath, new XXHash32Digest("invalid"), true);
+    }
+
+    private void runTestScenario(Path filePath, XXHash32Digest digest,
+                                 boolean errorExpectedDuringValidation) throws 
InterruptedException
+    {
+        CountDownLatch latch = new CountDownLatch(1);
+        ExposeAsyncFileXXHash32DigestVerifier verifier = newVerifier(digest);
+        verifier.verify(filePath.toAbsolutePath().toString())
+                .onComplete(complete -> {
+                    if (errorExpectedDuringValidation)
+                    {
+                        assertThat(complete.failed()).isTrue();
+                        assertThat(complete.cause())
+                        .isInstanceOf(HttpException.class)
+                        .extracting(from(t -> ((HttpException) 
t).getPayload()), as(InstanceOfAssertFactories.STRING))
+                        .contains("Digest mismatch. expected_digest=" + 
digest.value());
+                    }
+                    else
+                    {
+                        assertThat(complete.failed()).isFalse();
+                        
assertThat(complete.result()).endsWith("random-file.txt");
+                    }
+                    latch.countDown();
+                });
+
+        assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
+
+        assertThat(verifier.file).isNotNull();
+        // we can't close the file if it's already closed, so we expect the 
exception here
+        assertThatThrownBy(() -> verifier.file.end())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("File handle is closed");
+    }
+
+    static ExposeAsyncFileXXHash32DigestVerifier newVerifier(XXHash32Digest 
digest)
+    {
+        return new ExposeAsyncFileXXHash32DigestVerifier(vertx.fileSystem(), 
digest);
+    }
+
+    /**
+     * Class that extends from {@link XXHash32DigestVerifier} for testing 
purposes and holds a reference to the
+     * {@link AsyncFile} to ensure that the file has been closed.
+     */
+    static class ExposeAsyncFileXXHash32DigestVerifier extends 
XXHash32DigestVerifier
+    {
+        AsyncFile file;
+
+        public ExposeAsyncFileXXHash32DigestVerifier(FileSystem fs, 
XXHash32Digest digest)
+        {
+            super(fs, digest);
+        }
+
+        @Override
+        protected Future<String> calculateDigest(AsyncFile file)
+        {
+            this.file = file;
+            return super.calculateDigest(file);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to