This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new e329f32 CASSANDRASC-97: Add support for additional digest validation
during SSTable upload
e329f32 is described below
commit e329f3232fae867879aba2cd0a766404aaf9a427
Author: Francisco Guerrero <[email protected]>
AuthorDate: Thu Jan 25 10:30:52 2024 -0800
CASSANDRASC-97: Add support for additional digest validation during SSTable
upload
In this commit we add the ability to support additional digest algorithms
for verification
during SSTable uploads. We introduce the `DigestVerifierFactory` which now
supports
XXHash32 and MD5 `DigestVerifier`s.
This commit also adds support for XXHash32 digests. Clients can now send
the XXHash32 digest
instead of MD5. This would allow both the clients and server the
flexibility to utilize a more
performant algorithm.
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-97
---
CHANGES.txt | 1 +
.../cassandra/sidecar/client/RequestContext.java | 7 +-
.../cassandra/sidecar/client/SidecarClient.java | 7 +-
.../client/request/UploadSSTableRequest.java | 14 +-
.../sidecar/client/SidecarClientTest.java | 80 ++++++++++-
.../cassandra/sidecar/common/data/Digest.java | 28 ++--
.../cassandra/sidecar/common/data/MD5Digest.java | 68 +++++++++
.../sidecar/common/data/XXHash32Digest.java | 110 +++++++++++++++
.../common/http/SidecarHttpHeaderNames.java | 22 ++-
.../sidecar/data/SSTableUploadRequest.java | 19 +--
.../cassandra/sidecar/restore/RestoreJobUtil.java | 14 +-
.../sstableuploads/SSTableUploadHandler.java | 21 ++-
.../cassandra/sidecar/server/MainModule.java | 9 --
.../sidecar/utils/AsyncFileDigestVerifier.java | 106 ++++++++++++++
.../{ChecksumVerifier.java => DigestVerifier.java} | 15 +-
.../sidecar/utils/DigestVerifierFactory.java | 76 ++++++++++
.../sidecar/utils/MD5ChecksumVerifier.java | 108 --------------
.../cassandra/sidecar/utils/MD5DigestVerifier.java | 72 ++++++++++
.../cassandra/sidecar/utils/SSTableUploader.java | 51 ++-----
.../sidecar/utils/XXHash32DigestVerifier.java | 84 +++++++++++
.../sstableuploads/SSTableUploadHandlerTest.java | 129 ++++++++++-------
.../sidecar/utils/DigestVerifierFactoryTest.java | 89 ++++++++++++
...erifierTest.java => MD5DigestVerifierTest.java} | 63 ++++-----
.../cassandra/sidecar/utils/TestFileUtils.java | 61 ++++++++
.../sidecar/utils/XXHash32DigestVerifierTest.java | 155 +++++++++++++++++++++
25 files changed, 1092 insertions(+), 317 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b5a9b9..3f1b4cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Add support for additional digest validation during SSTable upload
(CASSANDRASC-97)
* Add sidecar client changes for restore from S3 (CASSANDRASC-95)
* Add restore SSTables from S3 into Cassandra feature to Cassandra Sidecar
(CASSANDRASC-92)
* Define routing order for http routes (CASSANDRASC-93)
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 9433e3a..7af57e9 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -44,6 +44,7 @@ import
org.apache.cassandra.sidecar.client.retry.NoRetryPolicy;
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
import
org.apache.cassandra.sidecar.client.selection.SingleInstanceSelectionPolicy;
+import org.apache.cassandra.sidecar.common.data.Digest;
import org.apache.cassandra.sidecar.common.utils.HttpRange;
import org.jetbrains.annotations.Nullable;
@@ -449,14 +450,14 @@ public class RequestContext
* @param tableName the table name in Cassandra
* @param uploadId an identifier for the upload
* @param component SSTable component being uploaded
- * @param checksum hash value to check integrity of SSTable component
uploaded
+ * @param digest digest value to check integrity of SSTable
component uploaded
* @param filename the path to the file to be uploaded
* @return a reference to this Builder
*/
public Builder uploadSSTableRequest(String keyspace, String tableName,
String uploadId, String component,
- String checksum, String filename)
+ Digest digest, String filename)
{
- return request(new UploadSSTableRequest(keyspace, tableName,
uploadId, component, checksum, filename));
+ return request(new UploadSSTableRequest(keyspace, tableName,
uploadId, component, digest, filename));
}
/**
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 3048808..42b7093 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.common.NodeSettings;
import org.apache.cassandra.sidecar.common.data.CreateRestoreJobRequestPayload;
import
org.apache.cassandra.sidecar.common.data.CreateRestoreJobResponsePayload;
import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload;
+import org.apache.cassandra.sidecar.common.data.Digest;
import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.data.HealthResponse;
import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
@@ -391,7 +392,7 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
* @param table the table name in Cassandra
* @param uploadId the unique identifier for the upload
* @param componentName the name of the SSTable component
- * @param checksum hash value to check integrity of SSTable component
uploaded
+ * @param digest digest value to check integrity of SSTable
component uploaded
* @param filename the path to the file to be uploaded
* @return a completable future for the request
*/
@@ -400,7 +401,7 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
String table,
String uploadId,
String componentName,
- String checksum,
+ Digest digest,
String filename)
{
return
executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
@@ -408,7 +409,7 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
table,
uploadId,
componentName,
-
checksum,
+
digest,
filename)
.build());
}
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
index ea0fe98..b93c82a 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
@@ -25,16 +25,16 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.Digest;
/**
* Represents a request to upload an SSTable component
*/
public class UploadSSTableRequest extends Request implements UploadableRequest
{
- private final String expectedChecksum;
+ private final Digest digest;
private final String filename;
/**
@@ -44,14 +44,14 @@ public class UploadSSTableRequest extends Request
implements UploadableRequest
* @param table the table name in Cassandra
* @param uploadId an identifier for the upload
* @param component SSTable component being uploaded
- * @param checksum hash value to check integrity of SSTable component
uploaded
+ * @param digest digest value to check integrity of SSTable component
uploaded
* @param filename the path to the file to be uploaded
*/
public UploadSSTableRequest(String keyspace, String table, String
uploadId, String component,
- String checksum, String filename)
+ Digest digest, String filename)
{
super(requestURI(keyspace, table, uploadId, component));
- this.expectedChecksum = checksum;
+ this.digest = digest;
this.filename = Objects.requireNonNull(filename, "the filename is must
be non-null");
if (!Files.exists(Paths.get(filename)))
@@ -72,12 +72,12 @@ public class UploadSSTableRequest extends Request
implements UploadableRequest
@Override
public Map<String, String> headers()
{
- if (expectedChecksum == null)
+ if (digest == null)
{
return super.headers();
}
Map<String, String> headers = new HashMap<>(super.headers());
- headers.put(HttpHeaderNames.CONTENT_MD5.toString(), expectedChecksum);
+ headers.putAll(digest.headers());
return Collections.unmodifiableMap(headers);
}
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index fb1102c..e06b709 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -63,6 +63,7 @@ import
org.apache.cassandra.sidecar.common.data.CreateRestoreJobResponsePayload;
import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.data.HealthResponse;
import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
import org.apache.cassandra.sidecar.common.data.RingEntry;
import org.apache.cassandra.sidecar.common.data.RingResponse;
@@ -70,6 +71,7 @@ import
org.apache.cassandra.sidecar.common.data.SSTableImportResponse;
import org.apache.cassandra.sidecar.common.data.SchemaResponse;
import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
import org.apache.cassandra.sidecar.common.utils.HttpRange;
import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen;
@@ -78,6 +80,8 @@ import static
io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpResponseStatus.PARTIAL_CONTENT;
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatException;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -689,7 +693,7 @@ abstract class SidecarClientTest
}
@Test
- void testUploadSSTableWithoutChecksum(@TempDir Path tempDirectory) throws
Exception
+ void testUploadSSTableWithoutDigest(@TempDir Path tempDirectory) throws
Exception
{
Path fileToUpload = prepareFile(tempDirectory);
try (MockWebServer server = new MockWebServer())
@@ -722,7 +726,7 @@ abstract class SidecarClientTest
}
@Test
- void testUploadSSTableWithChecksum(@TempDir Path tempDirectory) throws
Exception
+ void testUploadSSTableWithMD5Digest(@TempDir Path tempDirectory) throws
Exception
{
Path fileToUpload = prepareFile(tempDirectory);
try (MockWebServer server = new MockWebServer())
@@ -735,7 +739,7 @@ abstract class SidecarClientTest
"cyclist_name",
"0000-0000",
"nb-1-big-TOC.txt",
- "15a69dc6501aa5ae17af037fe053f610",
+ new
MD5Digest("15a69dc6501aa5ae17af037fe053f610"),
fileToUpload.toString())
.get(30, TimeUnit.SECONDS);
@@ -755,6 +759,76 @@ abstract class SidecarClientTest
}
}
+ @Test
+ void testUploadSSTableWithXXHashDigest(@TempDir Path tempDirectory) throws
Exception
+ {
+ Path fileToUpload = prepareFile(tempDirectory);
+ try (MockWebServer server = new MockWebServer())
+ {
+ server.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+ SidecarInstanceImpl sidecarInstance =
RequestExecutorTest.newSidecarInstance(server);
+ client.uploadSSTableRequest(sidecarInstance,
+ "cycling",
+ "cyclist_name",
+ "0000-0000",
+ "nb-1-big-TOC.txt",
+ new
XXHash32Digest("15a69dc6501aa5ae17af037fe053f610"),
+ fileToUpload.toString())
+ .get(30, TimeUnit.SECONDS);
+
+ assertThat(server.getRequestCount()).isEqualTo(1);
+ RecordedRequest request = server.takeRequest();
+ assertThat(request.getPath())
+ .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE
+ .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM,
"0000-0000")
+ .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM,
"cycling")
+ .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM,
"cyclist_name")
+ .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM,
"nb-1-big-TOC.txt"));
+ assertThat(request.getMethod()).isEqualTo("PUT");
+ assertThat(request.getHeader(CONTENT_XXHASH32))
+ .isEqualTo("15a69dc6501aa5ae17af037fe053f610");
+ assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isNull();
+
assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80");
+ assertThat(request.getBodySize()).isEqualTo(80);
+ }
+ }
+
+ @Test
+ void testUploadSSTableWithXXHashDigestAndSeed(@TempDir Path tempDirectory)
throws Exception
+ {
+ Path fileToUpload = prepareFile(tempDirectory);
+ try (MockWebServer server = new MockWebServer())
+ {
+ server.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+ SidecarInstanceImpl sidecarInstance =
RequestExecutorTest.newSidecarInstance(server);
+ client.uploadSSTableRequest(sidecarInstance,
+ "cycling",
+ "cyclist_name",
+ "0000-0000",
+ "nb-1-big-TOC.txt",
+ new
XXHash32Digest("15a69dc6501aa5ae17af037fe053f610", "123456"),
+ fileToUpload.toString())
+ .get(30, TimeUnit.SECONDS);
+
+ assertThat(server.getRequestCount()).isEqualTo(1);
+ RecordedRequest request = server.takeRequest();
+ assertThat(request.getPath())
+ .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE
+ .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM,
"0000-0000")
+ .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM,
"cycling")
+ .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM,
"cyclist_name")
+ .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM,
"nb-1-big-TOC.txt"));
+ assertThat(request.getMethod()).isEqualTo("PUT");
+ assertThat(request.getHeader(CONTENT_XXHASH32))
+ .isEqualTo("15a69dc6501aa5ae17af037fe053f610");
+
assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isEqualTo("123456");
+
assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80");
+ assertThat(request.getBodySize()).isEqualTo(80);
+ }
+ }
+
@Test
void testStreamSSTableComponentWithNoRange() throws Exception
{
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
similarity index 51%
copy from src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
copy to
common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
index e338642..1d4f432 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
@@ -16,23 +16,27 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar.utils;
+package org.apache.cassandra.sidecar.common.data;
-import io.vertx.core.Future;
+import java.util.Map;
/**
- * Interface to verify integrity of SSTables uploaded.
- * <p>
- * Note: If checksum calculations of multiple files are happening at the same
time, we would want to limit concurrent
- * checksum calculations. Since {@link ChecksumVerifier} is currently used
only by upload handler, we are not
- * introducing another limit here. Concurrent uploads limit should limit
concurrent checksum calculations as well.
+ * Interface that represents a checksum digest
*/
-public interface ChecksumVerifier
+public interface Digest
{
/**
- * @param checksum expected checksum value
- * @param filePath path to SSTable component
- * @return String component path, if verification is a success, else a
failed future is returned
+ * @return headers to be used in the HTTP request
*/
- Future<String> verify(String checksum, String filePath);
+ Map<String, String> headers();
+
+ /**
+ * @return the string representation of the digest
+ */
+ String value();
+
+ /**
+ * @return the name of the digest's algorithm
+ */
+ String algorithm();
}
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
new file mode 100644
index 0000000..ba8b7b5
--- /dev/null
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implements the MD5 checksum digest
+ */
+public class MD5Digest implements Digest
+{
+ private final @NotNull String value;
+
+ /**
+ * Constructs a new MD5Digest with the provided MD5 {@code value}
+ *
+ * @param value the MD5 value
+ */
+ public MD5Digest(@NotNull String value)
+ {
+ this.value = Objects.requireNonNull(value, "value is required");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String value()
+ {
+ return value;
+ }
+
+ @Override
+ public String algorithm()
+ {
+ return "MD5";
+ }
+
+ /**
+ * @return MD5 headers for the Digest
+ */
+ @Override
+ public Map<String, String> headers()
+ {
+ return
Collections.singletonMap(HttpHeaderNames.CONTENT_MD5.toString(), value);
+ }
+}
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
new file mode 100644
index 0000000..48127e4
--- /dev/null
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
+
+/**
+ * Implements the XXHash32 Digest
+ */
+public class XXHash32Digest implements Digest
+{
+ private final @NotNull String value;
+ private final @Nullable String seedHex;
+
+ /**
+ * Constructs a new XXHashDigest with the provided XXHash {@code value}
+ *
+ * @param value the xxhash value
+ */
+ public XXHash32Digest(String value)
+ {
+ this(value, null);
+ }
+
+ /**
+ * Constructs a new instance with the provided XXHash {@code value} and
the {@code seed} value.
+ *
+ * @param value the xxhash value
+ * @param seed the seed
+ */
+ public XXHash32Digest(String value, int seed)
+ {
+ this(value, Integer.toHexString(seed));
+ }
+
+ /**
+ * Constructs a new XXHashDigest with the provided XXHash {@code value}
and the seed value represented as
+ * a hexadecimal string
+ *
+ * @param value the xxhash value
+ * @param seedHex the value of the seed represented as a hexadecimal value
+ */
+ public XXHash32Digest(@NotNull String value, @Nullable String seedHex)
+ {
+ this.value = Objects.requireNonNull(value, "value is required");
+ this.seedHex = seedHex;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String value()
+ {
+ return value;
+ }
+
+ /**
+ * @return the optional seed in hexadecimal format
+ */
+ public @Nullable String seedHex()
+ {
+ return seedHex;
+ }
+
+ @Override
+ public String algorithm()
+ {
+ return "XXHash32";
+ }
+
+ /**
+ * @return XXHash headers for the digest
+ */
+ @Override
+ public Map<String, String> headers()
+ {
+ Map<String, String> headers = new HashMap<>();
+ headers.put(CONTENT_XXHASH32, value);
+ if (seedHex != null)
+ {
+ headers.put(CONTENT_XXHASH32_SEED, seedHex);
+ }
+ return headers;
+ }
+}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
similarity index 51%
copy from src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
copy to
common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
index e338642..347ed59 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
@@ -16,23 +16,19 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar.utils;
-
-import io.vertx.core.Future;
+package org.apache.cassandra.sidecar.common.http;
/**
- * Interface to verify integrity of SSTables uploaded.
- * <p>
- * Note: If checksum calculations of multiple files are happening at the same
time, we would want to limit concurrent
- * checksum calculations. Since {@link ChecksumVerifier} is currently used
only by upload handler, we are not
- * introducing another limit here. Concurrent uploads limit should limit
concurrent checksum calculations as well.
+ * Custom header names for sidecar
*/
-public interface ChecksumVerifier
+public final class SidecarHttpHeaderNames
{
/**
- * @param checksum expected checksum value
- * @param filePath path to SSTable component
- * @return String component path, if verification is a success, else a
failed future is returned
+ * {@code "cassandra-content-xxhash32"}
+ */
+ public static final String CONTENT_XXHASH32 = "cassandra-content-xxhash32";
+ /**
+ * {@code "cassandra-content-xxhash32-seed"}
*/
- Future<String> verify(String checksum, String filePath);
+ public static final String CONTENT_XXHASH32_SEED =
"cassandra-content-xxhash32-seed";
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
b/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
index cef1491..60cfbd2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
+++ b/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.sidecar.data;
-import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
import org.apache.cassandra.sidecar.common.data.SSTableUploads;
@@ -29,7 +28,6 @@ import
org.apache.cassandra.sidecar.common.data.SSTableUploads;
public class SSTableUploadRequest extends SSTableUploads
{
private final String component;
- private final String expectedChecksum;
/**
* Constructs an SSTableUploadRequest
@@ -37,16 +35,13 @@ public class SSTableUploadRequest extends SSTableUploads
* @param qualifiedTableName the qualified table name in Cassandra
* @param uploadId an identifier for the upload
* @param component SSTable component being uploaded
- * @param expectedChecksum expected hash value to check integrity of
SSTable component uploaded
*/
public SSTableUploadRequest(QualifiedTableName qualifiedTableName,
String uploadId,
- String component,
- String expectedChecksum)
+ String component)
{
super(qualifiedTableName, uploadId);
this.component = component;
- this.expectedChecksum = expectedChecksum;
}
/**
@@ -57,14 +52,6 @@ public class SSTableUploadRequest extends SSTableUploads
return this.component;
}
- /**
- * @return expected checksum value of SSTable component
- */
- public String expectedChecksum()
- {
- return this.expectedChecksum;
- }
-
/**
* {@inheritDoc}
*/
@@ -75,7 +62,6 @@ public class SSTableUploadRequest extends SSTableUploads
", keyspace='" + keyspace() + '\'' +
", tableName='" + table() + '\'' +
", component='" + component + '\'' +
- ", expectedChecksum='" + expectedChecksum + '\'' +
'}';
}
@@ -90,7 +76,6 @@ public class SSTableUploadRequest extends SSTableUploads
{
return new SSTableUploadRequest(qualifiedTableName,
context.pathParam("uploadId"),
- context.pathParam("component"),
-
context.request().getHeader(HttpHeaderNames.CONTENT_MD5.toString()));
+ context.pathParam("component"));
}
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 918d8c0..8828c8f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -114,12 +114,22 @@ public class RestoreJobUtil
* @return the checksum hex string of the file's content. XXHash32 is
employed as the hash algorithm.
*/
public static String checksum(File file) throws IOException
+ {
+ int seed = 0x9747b28c; // random seed for initializing
+ return checksum(file, seed);
+ }
+
+ /**
+ * @param file the file to use to perform the checksum
+ * @param seed the seed to use for the hasher
+ * @return the checksum hex string of the file's content. XXHash32 is
employed as the hash algorithm.
+ */
+ public static String checksum(File file, int seed) throws IOException
{
try (FileInputStream fis = new FileInputStream(file))
{
// might have shared hashers with ThreadLocal
XXHashFactory factory = XXHashFactory.safeInstance();
- int seed = 0x9747b28c; // random seed for initializing
try (StreamingXXHash32 hasher = factory.newStreamingHash32(seed))
{
byte[] buffer = new byte[KB_512];
@@ -128,7 +138,7 @@ public class RestoreJobUtil
{
hasher.update(buffer, 0, len);
}
- return Long.toHexString(hasher.getValue());
+ return Long.toHexString(hasher.getValue());
}
}
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
index dc23141..9347f7f 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
@@ -42,6 +42,8 @@ import org.apache.cassandra.sidecar.routes.AbstractHandler;
import org.apache.cassandra.sidecar.stats.SSTableStats;
import org.apache.cassandra.sidecar.stats.SidecarStats;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.DigestVerifier;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.MetadataUtils;
import org.apache.cassandra.sidecar.utils.SSTableUploader;
@@ -62,6 +64,7 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
private final SSTableUploadsPathBuilder uploadPathBuilder;
private final ConcurrencyLimiter limiter;
private final SSTableStats stats;
+ private final DigestVerifierFactory digestVerifierFactory;
/**
* Constructs a handler with the provided params.
@@ -74,6 +77,7 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
* @param executorPools executor pools for blocking executions
* @param validator a validator instance to validate
Cassandra-specific input
* @param sidecarStats an interface holding all stats related to
main sidecar process
+ * @param digestVerifierFactory a factory of checksum verifiers
*/
@Inject
protected SSTableUploadHandler(Vertx vertx,
@@ -83,7 +87,8 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
SSTableUploadsPathBuilder uploadPathBuilder,
ExecutorPools executorPools,
CassandraInputValidator validator,
- SidecarStats sidecarStats)
+ SidecarStats sidecarStats,
+ DigestVerifierFactory digestVerifierFactory)
{
super(metadataFetcher, executorPools, validator);
this.fs = vertx.fileSystem();
@@ -92,6 +97,7 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
this.uploadPathBuilder = uploadPathBuilder;
this.limiter = new
ConcurrencyLimiter(configuration::concurrentUploadsLimit);
this.stats = sidecarStats.ssTableStats();
+ this.digestVerifierFactory = digestVerifierFactory;
}
/**
@@ -124,11 +130,14 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
.compose(validRequest ->
uploadPathBuilder.resolveStagingDirectory(host))
.compose(this::ensureSufficientSpaceAvailable)
.compose(v -> uploadPathBuilder.build(host, request))
- .compose(uploadDirectory -> uploader.uploadComponent(httpRequest,
- uploadDirectory,
-
request.component(),
-
request.expectedChecksum(),
-
configuration.filePermissions()))
+ .compose(uploadDirectory -> {
+ DigestVerifier digestVerifier =
digestVerifierFactory.verifier(httpRequest.headers());
+ return uploader.uploadComponent(httpRequest,
+ uploadDirectory,
+ request.component(),
+ digestVerifier,
+ configuration.filePermissions());
+ })
.compose(fs::props)
.onSuccess(fileProps -> {
long serviceTimeMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index b1af4b7..31ab7b1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -96,8 +96,6 @@ import org.apache.cassandra.sidecar.stats.RestoreJobStats;
import org.apache.cassandra.sidecar.stats.SidecarSchemaStats;
import org.apache.cassandra.sidecar.stats.SidecarStats;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
-import org.apache.cassandra.sidecar.utils.ChecksumVerifier;
-import org.apache.cassandra.sidecar.utils.MD5ChecksumVerifier;
import org.apache.cassandra.sidecar.utils.TimeProvider;
import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
@@ -428,13 +426,6 @@ public class MainModule extends AbstractModule
return DnsResolver.DEFAULT;
}
- @Provides
- @Singleton
- public ChecksumVerifier checksumVerifier(Vertx vertx)
- {
- return new MD5ChecksumVerifier(vertx.fileSystem());
- }
-
@Provides
@Singleton
public SidecarVersionProvider sidecarVersionProvider()
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
new file mode 100644
index 0000000..790b54c
--- /dev/null
+++
b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.Digest;
+
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
+
+/**
+ * Provides basic functionality to perform digest validations using {@link
AsyncFile}
+ *
+ * @param <D> the Digest type
+ */
+public abstract class AsyncFileDigestVerifier<D extends Digest> implements
DigestVerifier
+{
+ public static final int DEFAULT_READ_BUFFER_SIZE = 512 * 1024; // 512KiB
+ protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+ protected final FileSystem fs;
+ protected final D digest;
+
+ protected AsyncFileDigestVerifier(FileSystem fs, D digest)
+ {
+ this.fs = fs;
+ this.digest = Objects.requireNonNull(digest, "digest is required");
+ }
+
+ /**
+ * @param filePath path to SSTable component
+ * @return a future String with the component path if verification is a
success, otherwise a failed future
+ */
+ @Override
+ public Future<String> verify(String filePath)
+ {
+ logger.debug("Validating {}. expected_digest={}", digest.algorithm(),
digest.value());
+
+ return fs.open(filePath, new OpenOptions())
+ .compose(this::calculateDigest)
+ .compose(computedDigest -> {
+ if (!computedDigest.equals(digest.value()))
+ {
+ logger.error("Digest mismatch. computed_digest={},
expected_digest={}, algorithm=MD5",
+ computedDigest, digest.value());
+ return Future.failedFuture(new
HttpException(CHECKSUM_MISMATCH.code(),
+
String.format("Digest mismatch. "
+
+ "expected_digest=%s, "
+
+ "algorithm=%s",
+
digest.value(),
+
digest.algorithm())));
+ }
+ return Future.succeededFuture(filePath);
+ });
+ }
+
+ /**
+ * Returns a future with the calculated digest for the provided {@link
AsyncFile file}.
+ *
+ * @param asyncFile the async file to use for digest calculation
+ * @return a future with the computed digest for the provided {@link
AsyncFile file}
+ */
+ protected abstract Future<String> calculateDigest(AsyncFile asyncFile);
+
+ protected void readFile(AsyncFile file, Promise<String> result,
Handler<Buffer> onBufferAvailable,
+ Handler<Void> onReadComplete)
+ {
+ // Make sure to close the file when complete
+ result.future().onComplete(ignored -> file.end());
+ file.pause()
+ .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
+ .handler(onBufferAvailable)
+ .endHandler(onReadComplete)
+ .exceptionHandler(cause -> {
+ logger.error("Error while calculating the {} digest",
digest.algorithm(), cause);
+ result.fail(cause);
+ })
+ .resume();
+ }
+}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
similarity index 64%
rename from
src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
rename to src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
index e338642..07bb741 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
@@ -23,16 +23,15 @@ import io.vertx.core.Future;
/**
* Interface to verify integrity of SSTables uploaded.
* <p>
- * Note: If checksum calculations of multiple files are happening at the same
time, we would want to limit concurrent
- * checksum calculations. Since {@link ChecksumVerifier} is currently used
only by upload handler, we are not
- * introducing another limit here. Concurrent uploads limit should limit
concurrent checksum calculations as well.
+ * Note: If digest calculations of multiple files are happening at the same
time, we would want to limit concurrent
+ * digest calculations. Since {@link DigestVerifier} is currently used only by
upload handler, we are not
+ * introducing another limit here. Concurrent uploads limit should limit
concurrent digest calculations as well.
*/
-public interface ChecksumVerifier
+public interface DigestVerifier
{
/**
- * @param checksum expected checksum value
- * @param filePath path to SSTable component
- * @return String component path, if verification is a success, else a
failed future is returned
+ * @param filePath path to SSTable component
+ * @return a future String with the component path if verification is a
success, otherwise a failed future
*/
- Future<String> verify(String checksum, String filePath);
+ Future<String> verify(String filePath);
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
new file mode 100644
index 0000000..3dd64a2
--- /dev/null
+++
b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.FileSystem;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+
+/**
+ * A factory class that returns the {@link DigestVerifier} instance.
+ */
+@Singleton
+public class DigestVerifierFactory
+{
+ @VisibleForTesting
+ static final DigestVerifier FALLBACK_VERIFIER = Future::succeededFuture;
+ private final FileSystem fs;
+
+
+ /**
+ * Constructs a new factory
+ *
+ * @param vertx the vertx instance
+ */
+ @Inject
+ public DigestVerifierFactory(Vertx vertx)
+ {
+ this.fs = vertx.fileSystem();
+ }
+
+ /***
+ * Returns the first match for a {@link DigestVerifier} from the
registered list of verifiers. If none of the
+ * verifiers matches, a no-op validator is returned.
+ *
+ * @param headers the request headers used to test whether a {@link
DigestVerifier} can be used to verify the
+ * request
+ * @return the first match for a {@link DigestVerifier} from the
registered list of verifiers, or a no-op
+ * verifier if none match
+ */
+ public DigestVerifier verifier(MultiMap headers)
+ {
+ if (headers.contains(CONTENT_XXHASH32))
+ {
+ return XXHash32DigestVerifier.create(fs, headers);
+ }
+ else if (headers.contains(HttpHeaderNames.CONTENT_MD5.toString()))
+ {
+ return MD5DigestVerifier.create(fs, headers);
+ }
+ // Fallback to no-op validator
+ return FALLBACK_VERIFIER;
+ }
+}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
b/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
deleted file mode 100644
index 132ff10..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.utils;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Base64;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.vertx.core.Future;
-import io.vertx.core.Promise;
-import io.vertx.core.file.AsyncFile;
-import io.vertx.core.file.FileSystem;
-import io.vertx.core.file.OpenOptions;
-import io.vertx.ext.web.handler.HttpException;
-import org.jetbrains.annotations.VisibleForTesting;
-
-import static
org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
-
-/**
- * Implementation of {@link ChecksumVerifier}. Here we use MD5 implementation
of {@link java.security.MessageDigest}
- * for calculating checksum. And match the calculated checksum with expected
checksum obtained from request.
- */
-public class MD5ChecksumVerifier implements ChecksumVerifier
-{
- private static final Logger LOGGER =
LoggerFactory.getLogger(MD5ChecksumVerifier.class);
- public static final int DEFAULT_READ_BUFFER_SIZE = 64 * 1024; // 64KiB
- private final FileSystem fs;
-
- public MD5ChecksumVerifier(FileSystem fs)
- {
- this.fs = fs;
- }
-
- public Future<String> verify(String expectedChecksum, String filePath)
- {
- if (expectedChecksum == null)
- {
- return Future.succeededFuture(filePath);
- }
-
- LOGGER.debug("Validating MD5. expected_checksum={}", expectedChecksum);
-
- return fs.open(filePath, new OpenOptions())
- .compose(this::calculateMD5)
- .compose(computedChecksum -> {
- if (!expectedChecksum.equals(computedChecksum))
- {
- LOGGER.error("Checksum mismatch.
computed_checksum={}, expected_checksum={}, algorithm=MD5",
- computedChecksum, expectedChecksum);
- return Future.failedFuture(new
HttpException(CHECKSUM_MISMATCH.code(),
-
String.format("Checksum mismatch. "
-
+ "expected_checksum=%s, "
-
+ "algorithm=MD5",
-
expectedChecksum)));
- }
- return Future.succeededFuture(filePath);
- });
- }
-
- @VisibleForTesting
- Future<String> calculateMD5(AsyncFile file)
- {
- MessageDigest digest;
- try
- {
- digest = MessageDigest.getInstance("MD5");
- }
- catch (NoSuchAlgorithmException e)
- {
- return Future.failedFuture(e);
- }
-
- Promise<String> result = Promise.promise();
- file.pause()
- .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
- .handler(buf -> digest.update(buf.getBytes()))
- .endHandler(_v -> {
-
result.complete(Base64.getEncoder().encodeToString(digest.digest()));
- file.end();
- })
- .exceptionHandler(cause -> {
- LOGGER.error("Error while calculating MD5 checksum", cause);
- result.fail(cause);
- file.end();
- })
- .resume();
- return result.future();
- }
-}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
b/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
new file mode 100644
index 0000000..ed4218d
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Implementation of {@link DigestVerifier}. Here we use MD5 implementation of
{@link java.security.MessageDigest}
+ * for calculating digest and match the calculated digest with expected digest
obtained from request.
+ */
+public class MD5DigestVerifier extends AsyncFileDigestVerifier<MD5Digest>
+{
+ protected MD5DigestVerifier(FileSystem fs, MD5Digest digest)
+ {
+ super(fs, digest);
+ }
+
+ public static DigestVerifier create(FileSystem fs, MultiMap headers)
+ {
+ MD5Digest md5Digest = new
MD5Digest(headers.get(HttpHeaderNames.CONTENT_MD5.toString()));
+ return new MD5DigestVerifier(fs, md5Digest);
+ }
+
+ @Override
+ @VisibleForTesting
+ protected Future<String> calculateDigest(AsyncFile file)
+ {
+ MessageDigest digest;
+ try
+ {
+ digest = MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ return Future.failedFuture(e);
+ }
+
+ Promise<String> result = Promise.promise();
+
+ readFile(file, result, buf -> digest.update(buf.getBytes()),
+ _v ->
result.complete(Base64.getEncoder().encodeToString(digest.digest())));
+
+ return result.future();
+ }
+}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
index 465f0a4..e7f59a4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
@@ -39,6 +39,7 @@ import io.vertx.core.file.FileSystem;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
+import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
/**
* A class that handles SSTable Uploads
@@ -50,23 +51,19 @@ public class SSTableUploader
private static final String DEFAULT_TEMP_SUFFIX = ".tmp";
private final FileSystem fs;
- private final ChecksumVerifier checksumVerifier;
private final SidecarRateLimiter rateLimiter;
/**
* Constructs an instance of {@link SSTableUploader} with provided params
for uploading an SSTable component.
*
- * @param vertx Vertx reference
- * @param checksumVerifier verifier for checking integrity of upload
- * @param rateLimiter rate limiter for uploading SSTable components
+ * @param vertx Vertx reference
+ * @param rateLimiter rate limiter for uploading SSTable components
*/
@Inject
public SSTableUploader(Vertx vertx,
- ChecksumVerifier checksumVerifier,
@Named("IngressFileRateLimiter") SidecarRateLimiter
rateLimiter)
{
this.fs = vertx.fileSystem();
- this.checksumVerifier = checksumVerifier;
this.rateLimiter = rateLimiter;
}
@@ -76,14 +73,14 @@ public class SSTableUploader
* @param readStream server request from which file upload is
acquired
* @param uploadDirectory the absolute path to the upload directory in
the target {@code fs}
* @param componentFileName the file name of the component
- * @param expectedChecksum for verifying upload integrity, passed in
through request
+ * @param digestVerifier the digest verifier instance
* @param filePermissions specifies the posix file permissions used to
create the SSTable file
* @return path of SSTable component to which data was uploaded
*/
public Future<String> uploadComponent(ReadStream<Buffer> readStream,
String uploadDirectory,
String componentFileName,
- String expectedChecksum,
+ DigestVerifier digestVerifier,
String filePermissions)
{
@@ -92,15 +89,16 @@ public class SSTableUploader
return fs.mkdirs(uploadDirectory) // ensure the parent directory is
created
.compose(v -> createTempFile(uploadDirectory,
componentFileName, filePermissions))
- .compose(tempFilePath -> streamAndVerify(readStream,
tempFilePath, expectedChecksum))
+ .compose(tempFilePath -> streamAndVerify(readStream,
tempFilePath, digestVerifier))
.compose(verifiedTempFilePath ->
moveAtomicallyWithFallBack(verifiedTempFilePath, targetPath));
}
- private Future<String> streamAndVerify(ReadStream<Buffer> readStream,
String tempFilePath, String expectedChecksum)
+ private Future<String> streamAndVerify(ReadStream<Buffer> readStream,
String tempFilePath,
+ DigestVerifier digestVerifier)
{
// pipe read stream to temp file
return streamToFile(readStream, tempFilePath)
- .compose(v -> checksumVerifier.verify(expectedChecksum,
tempFilePath))
+ .compose(v -> digestVerifier.verify(tempFilePath))
.onFailure(throwable -> fs.delete(tempFilePath));
}
@@ -128,7 +126,9 @@ public class SSTableUploader
LOGGER.debug("Moving from={} to={}", source, target);
return fs.move(source, target, new CopyOptions().setAtomicMove(true))
.recover(cause -> {
- if (hasCause(cause,
AtomicMoveNotSupportedException.class, 10))
+ Exception atomicMoveNotSupportedException =
+ ThrowableUtils.getCause(cause,
AtomicMoveNotSupportedException.class);
+ if (atomicMoveNotSupportedException != null)
{
LOGGER.warn("Failed to perform atomic move from={}
to={}", source, target, cause);
return fs.move(source, target, new
CopyOptions().setAtomicMove(false));
@@ -138,33 +138,6 @@ public class SSTableUploader
.compose(v -> Future.succeededFuture(target));
}
- /**
- * Returns true if a cause of type {@code type} is found in the stack
trace before exceeding the {@code depth}
- *
- * @param cause the original cause
- * @param type the exception type to test
- * @param depth the maximum depth to check in the stack trace
- * @return true if the exception of type {@code type} exists in the
stacktrace, false otherwise
- */
- private static boolean hasCause(Throwable cause, Class<? extends
Throwable> type, int depth)
- {
- int i = 0;
- while (i < depth)
- {
- if (cause == null)
- return false;
-
- if (type.isInstance(cause))
- return true;
-
- cause = cause.getCause();
-
- i++;
- }
- return false;
- }
-
-
/**
* A {@link WriteStream} implementation that supports rate limiting.
*/
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
b/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
new file mode 100644
index 0000000..b1944c6
--- /dev/null
+++
b/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import net.jpountz.xxhash.StreamingXXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
+
+/**
+ * Implementation of {@link DigestVerifier} to calculate the digest and match
the calculated digest
+ * with the expected digest.
+ */
+public class XXHash32DigestVerifier extends
AsyncFileDigestVerifier<XXHash32Digest>
+{
+ protected XXHash32DigestVerifier(FileSystem fs, XXHash32Digest digest)
+ {
+ super(fs, digest);
+ }
+
+ public static XXHash32DigestVerifier create(FileSystem fs, MultiMap
headers)
+ {
+ XXHash32Digest digest = new
XXHash32Digest(headers.get(CONTENT_XXHASH32),
headers.get(CONTENT_XXHASH32_SEED));
+ return new XXHash32DigestVerifier(fs, digest);
+ }
+
+ @Override
+ @VisibleForTesting
+ protected Future<String> calculateDigest(AsyncFile file)
+ {
+ Promise<String> result = Promise.promise();
+ Future<String> future = result.future();
+
+ // might have shared hashers with ThreadLocal
+ XXHashFactory factory = XXHashFactory.safeInstance();
+
+ int seed = maybeGetSeedOrDefault();
+ StreamingXXHash32 hasher = factory.newStreamingHash32(seed);
+
+ future.onComplete(ignored -> hasher.close());
+
+ readFile(file, result, buf -> {
+ byte[] bytes = buf.getBytes();
+ hasher.update(bytes, 0, bytes.length);
+ },
+ _v -> result.complete(Long.toHexString(hasher.getValue())));
+
+ return future;
+ }
+
+ protected int maybeGetSeedOrDefault()
+ {
+ String seedHex = digest.seedHex();
+ if (seedHex != null)
+ {
+ return (int) Long.parseLong(seedHex, 16);
+ }
+ return 0x9747b28c; // random seed for initializing
+ }
+}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
index 49554c6..2694fb4 100644
---
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.sidecar.routes.sstableuploads;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -27,7 +26,6 @@ import java.nio.file.attribute.PosixFilePermission;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -45,6 +43,9 @@ import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.Digest;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
import org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus;
import org.apache.cassandra.sidecar.snapshots.SnapshotUtils;
import org.assertj.core.data.Percentage;
@@ -58,6 +59,7 @@ import static
java.nio.file.attribute.PosixFilePermission.OTHERS_WRITE;
import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE;
import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static org.apache.cassandra.sidecar.utils.TestFileUtils.prepareTestFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@@ -75,7 +77,7 @@ class SSTableUploadHandlerTest extends BaseUploadsHandlerTest
void testUploadWithoutMd5_expectSuccessfulUpload(VertxTestContext context)
throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.OK.code(), false);
}
@@ -83,15 +85,63 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void testUploadWithCorrectMd5_expectSuccessfulUpload(VertxTestContext
context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-correct-md5.db", "jXd/OF09/siBXSD3SWAm3A==",
- Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.OK.code(), false);
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-correct-md5.db",
+ new MD5Digest("jXd/OF09/siBXSD3SWAm3A=="),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ HttpResponseStatus.OK.code(),
+ false);
+ }
+
+ @Test
+ void testUploadWithCorrectXXHash_expectSuccessfulUpload(VertxTestContext
context) throws IOException
+ {
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-correct-xxhash.db",
+ new XXHash32Digest("7a28edc0"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ HttpResponseStatus.OK.code(),
+ false);
+ }
+
+ @Test
+ void
testUploadWithCorrectXXHashAndCustomSeed_expectSuccessfulUpload(VertxTestContext
context) throws IOException
+ {
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-correct-xxhash.db",
+ new XXHash32Digest("ffffffffb9510d6b",
"55555555"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ HttpResponseStatus.OK.code(),
+ false);
}
@Test
void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context)
throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-incorrect-md5.db", "incorrectMd5",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-incorrect-md5.db",
+ new MD5Digest("incorrectMd5"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+
SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
+ false);
+ }
+
+ @Test
+ void testUploadWithIncorrectXXHash_expectErrorCode(VertxTestContext
context) throws IOException
+ {
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-incorrect-xxhash.db",
+ new XXHash32Digest("incorrectXXHash"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+
SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
+ false);
+ }
+
+ @Test
+ void
testUploadWithIncorrectXXHashAndCustomSeed_expectErrorCode(VertxTestContext
context) throws IOException
+ {
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-incorrect-xxhash.db",
+ new XXHash32Digest("7a28edc0", "bad"),
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
false);
@@ -101,26 +151,27 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void testInvalidFileName_expectErrorCode(VertxTestContext context) throws
IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"ks$tbl-me-4-big-Data.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"ks$tbl-me-4-big-Data.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false);
}
@Test
- void
testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context)
throws IOException
+ void
testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context)
{
UUID uploadId = UUID.randomUUID();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-content-length.db",
- "jXd/OF09/siBXSD3SWAm3A==", 0,
HttpResponseStatus.OK.code(), false);
+ new MD5Digest("jXd/OF09/siBXSD3SWAm3A=="),
0, HttpResponseStatus.OK.code(), false);
}
@Test
- void testUploadTimeout_expectTimeoutError(VertxTestContext context) throws
IOException
+ void testUploadTimeout_expectTimeoutError(VertxTestContext context)
{
// if we send more than actual length, vertx goes hung, probably
looking for more data than exists in the file,
// we should see timeout error in this case
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-higher-content-length.db", "", 1000, -1, true);
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-higher-content-length.db", null, 1000, -1,
+ true);
}
@Test
@@ -128,14 +179,14 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
{
UUID uploadId = UUID.randomUUID();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-lesser-content-length.db",
- "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
+ null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
false);
}
@Test
void testInvalidUploadId(VertxTestContext context) throws IOException
{
- sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl",
"with-lesser-content-length.db", "",
+ sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl",
"with-lesser-content-length.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false, response -> {
JsonObject error = response.bodyAsJsonObject();
@@ -149,7 +200,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void testInvalidKeyspace(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace",
"tbl", "with-lesser-content-length.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace",
"tbl", "with-lesser-content-length.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false);
}
@@ -158,7 +209,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void testInvalidTable(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks",
"invalidTableName", "with-lesser-content-length.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks",
"invalidTableName", "with-lesser-content-length.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false);
}
@@ -169,7 +220,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
when(mockSSTableUploadConfiguration.minimumSpacePercentageRequired()).thenReturn(100F);
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.INSUFFICIENT_STORAGE.code(), false);
}
@@ -180,7 +231,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
when(mockSSTableUploadConfiguration.concurrentUploadsLimit()).thenReturn(0);
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.TOO_MANY_REQUESTS.code(), false);
}
@@ -193,13 +244,13 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
UUID uploadId = UUID.randomUUID();
CountDownLatch latch = new CountDownLatch(1);
sendUploadRequestAndVerify(latch, context, uploadId.toString(),
"invalidKeyspace", "tbl",
- "without-md5.db", "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ "without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
// checking if permits were released after bad requests
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.OK.code(), false);
}
@@ -209,7 +260,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
String uploadId = UUID.randomUUID().toString();
when(mockSSTableUploadConfiguration.filePermissions()).thenReturn("rwxr-xr-x");
- sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl",
"without-md5.db", "",
+ sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl",
"without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.OK.code(),
false, response -> {
@@ -247,7 +298,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
long startTime = System.nanoTime();
String uploadId = UUID.randomUUID().toString();
- sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl",
"1MB-File-Data.db", "",
+ sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl",
"1MB-File-Data.db", null,
Files.size(largeFilePath),
HttpResponseStatus.OK.code(),
false, response -> {
@@ -269,7 +320,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
long startTime = System.nanoTime();
String uploadId = UUID.randomUUID().toString();
- sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl",
"1MB-File-Data.db", "",
+ sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl",
"1MB-File-Data.db", null,
Files.size(largeFilePath),
HttpResponseStatus.OK.code(),
false, response -> {
@@ -286,13 +337,13 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
String keyspace,
String tableName,
String targetFileName,
- String expectedMd5,
+ Digest expectedDigest,
long fileLength,
int expectedRetCode,
boolean expectTimeout)
{
sendUploadRequestAndVerify(null, context, uploadId.toString(),
keyspace, tableName, targetFileName,
- expectedMd5, fileLength, expectedRetCode,
expectTimeout);
+ expectedDigest, fileLength,
expectedRetCode, expectTimeout);
}
private void sendUploadRequestAndVerify(CountDownLatch latch,
@@ -301,7 +352,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
String keyspace,
String tableName,
String targetFileName,
- String expectedMd5,
+ Digest expectedDigest,
long fileLength,
int expectedRetCode,
boolean expectTimeout)
@@ -312,7 +363,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
keyspace,
tableName,
targetFileName,
- expectedMd5,
+ expectedDigest,
fileLength,
expectedRetCode,
expectTimeout,
@@ -326,7 +377,7 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
String keyspace,
String tableName,
String targetFileName,
- String expectedMd5,
+ Digest expectedDigest,
long fileLength,
int expectedRetCode,
boolean expectTimeout,
@@ -337,9 +388,9 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
String testRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" +
keyspace
+ "/tables/" + tableName + "/components/" +
targetFileName;
HttpRequest<Buffer> req = client.put(server.actualPort(), "localhost",
testRoute);
- if (!expectedMd5.isEmpty())
+ if (expectedDigest != null)
{
- req.putHeader(HttpHeaderNames.CONTENT_MD5.toString(), expectedMd5);
+ req.headers().addAll(expectedDigest.headers());
}
if (fileLength != 0)
{
@@ -383,24 +434,4 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
client.close();
});
}
-
- static Path prepareTestFile(Path directory, String fileName, long
sizeInBytes) throws IOException
- {
- Path filePath = directory.resolve(fileName);
- Files.deleteIfExists(filePath);
-
- byte[] buffer = new byte[1024];
- try (OutputStream outputStream = Files.newOutputStream(filePath))
- {
- int written = 0;
- while (written < sizeInBytes)
- {
- ThreadLocalRandom.current().nextBytes(buffer);
- int toWrite = (int) Math.min(buffer.length, sizeInBytes -
written);
- outputStream.write(buffer, 0, toWrite);
- written += toWrite;
- }
- }
- return filePath;
- }
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
b/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
new file mode 100644
index 0000000..810e015
--- /dev/null
+++
b/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.impl.headers.HeadersMultiMap;
+
+import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static
org.apache.cassandra.sidecar.utils.DigestVerifierFactory.FALLBACK_VERIFIER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link DigestVerifierFactory}
+ */
+class DigestVerifierFactoryTest
+{
+ MultiMap options;
+ Vertx vertx = Vertx.vertx();
+
+ @BeforeEach
+ void setup()
+ {
+ options = new HeadersMultiMap();
+ }
+
+
+ @Test
+ void testEmptyFactoryReturnsFallbackVerifier()
+ {
+ DigestVerifier verifier = new
DigestVerifierFactory(vertx).verifier(options);
+ assertThat(verifier).as("should fallback to the fallback verifier when
no verifiers are configured")
+ .isNotNull()
+ .isSameAs(FALLBACK_VERIFIER);
+ }
+
+ @Test
+ void testMd5Verifier()
+ {
+ options.set("content-md5", "md5-header");
+ DigestVerifier verifier = new
DigestVerifierFactory(vertx).verifier(options);
+
+ assertThat(verifier).as("MD5DigestVerifier can verify MD5 content
headers")
+ .isNotNull()
+ .isInstanceOf(MD5DigestVerifier.class);
+ }
+
+ @Test
+ void testXXHashVerifier()
+ {
+ options.set(CONTENT_XXHASH32, "xxhash-header");
+ DigestVerifier verifier = new
DigestVerifierFactory(vertx).verifier(options);
+
+ assertThat(verifier).as("XXHashDigestVerifier can verify XXHash
content headers")
+ .isNotNull()
+ .isInstanceOf(XXHash32DigestVerifier.class);
+ }
+
+ @Test
+ void testFirstVerifierTakesPrecedence()
+ {
+ options.set("content-md5", "md5-header")
+ .set(CONTENT_XXHASH32, "xxhash-header");
+ DigestVerifier verifier = new
DigestVerifierFactory(vertx).verifier(options);
+
+ assertThat(verifier).as("XXHashDigestVerifier is selected when both
headers are present")
+ .isNotNull()
+ .isInstanceOf(XXHash32DigestVerifier.class);
+ }
+}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
b/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
similarity index 55%
rename from
src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
rename to
src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
index 8eb5680..550ef8e 100644
---
a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
@@ -19,13 +19,12 @@
package org.apache.cassandra.sidecar.utils;
import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeAll;
@@ -36,17 +35,17 @@ import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
- * Unit tests for {@link MD5ChecksumVerifier}
+ * Unit tests for {@link MD5DigestVerifier}
*/
-class MD5ChecksumVerifierTest
+class MD5DigestVerifierTest
{
static Vertx vertx;
- static ExposeAsyncFileMD5ChecksumVerifier verifier;
@TempDir
Path tempDir;
@@ -55,33 +54,33 @@ class MD5ChecksumVerifierTest
static void setup()
{
vertx = Vertx.vertx();
- verifier = new ExposeAsyncFileMD5ChecksumVerifier(vertx.fileSystem());
}
@Test
- void testFileDescriptorsClosedWithValidChecksum() throws IOException,
NoSuchAlgorithmException,
-
InterruptedException
+ void testFileDescriptorsClosedWithValidDigest() throws IOException,
NoSuchAlgorithmException,
+ InterruptedException
{
- byte[] randomBytes = generateRandomBytes();
- Path randomFilePath = writeBytesToRandomFile(randomBytes);
- String expectedChecksum = Base64.getEncoder()
-
.encodeToString(MessageDigest.getInstance("MD5")
-
.digest(randomBytes));
+ Path randomFilePath = TestFileUtils.prepareTestFile(tempDir,
"random-file.txt", 1024);
+ byte[] randomBytes = Files.readAllBytes(randomFilePath);
+ String expectedDigest = Base64.getEncoder()
+
.encodeToString(MessageDigest.getInstance("MD5")
+
.digest(randomBytes));
- runTestScenario(randomFilePath, expectedChecksum);
+ runTestScenario(randomFilePath, expectedDigest);
}
@Test
- void testFileDescriptorsClosedWithInvalidChecksum() throws IOException,
InterruptedException
+ void testFileDescriptorsClosedWithInvalidDigest() throws IOException,
InterruptedException
{
- Path randomFilePath = writeBytesToRandomFile(generateRandomBytes());
+ Path randomFilePath = TestFileUtils.prepareTestFile(tempDir,
"random-file.txt", 1024);
runTestScenario(randomFilePath, "invalid");
}
- private void runTestScenario(Path filePath, String checksum) throws
InterruptedException
+ private void runTestScenario(Path filePath, String digest) throws
InterruptedException
{
CountDownLatch latch = new CountDownLatch(1);
- verifier.verify(checksum, filePath.toAbsolutePath().toString())
+ ExposeAsyncFileMD5DigestVerifier verifier = newVerifier(new
MD5Digest(digest));
+ verifier.verify(filePath.toAbsolutePath().toString())
.onComplete(complete -> latch.countDown());
assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
@@ -93,41 +92,29 @@ class MD5ChecksumVerifierTest
.hasMessageContaining("File handle is closed");
}
- private byte[] generateRandomBytes()
+ static ExposeAsyncFileMD5DigestVerifier newVerifier(MD5Digest digest)
{
- byte[] bytes = new byte[1024];
- ThreadLocalRandom.current().nextBytes(bytes);
- return bytes;
- }
-
- private Path writeBytesToRandomFile(byte[] bytes) throws IOException
- {
- Path tempPath = tempDir.resolve("random-file.txt");
- try (RandomAccessFile writer = new RandomAccessFile(tempPath.toFile(),
"rw"))
- {
- writer.write(bytes);
- }
- return tempPath;
+ return new ExposeAsyncFileMD5DigestVerifier(vertx.fileSystem(),
digest);
}
/**
- * Class that extends from {@link MD5ChecksumVerifier} for testing
purposes and holds a reference to the
+ * Class that extends from {@link MD5DigestVerifier} for testing purposes
and holds a reference to the
* {@link AsyncFile} to ensure that the file has been closed.
*/
- static class ExposeAsyncFileMD5ChecksumVerifier extends MD5ChecksumVerifier
+ static class ExposeAsyncFileMD5DigestVerifier extends MD5DigestVerifier
{
AsyncFile file;
- public ExposeAsyncFileMD5ChecksumVerifier(FileSystem fs)
+ public ExposeAsyncFileMD5DigestVerifier(FileSystem fs, MD5Digest
md5Digest)
{
- super(fs);
+ super(fs, md5Digest);
}
@Override
- Future<String> calculateMD5(AsyncFile file)
+ protected Future<String> calculateDigest(AsyncFile file)
{
this.file = file;
- return super.calculateMD5(file);
+ return super.calculateDigest(file);
}
}
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
b/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
new file mode 100644
index 0000000..195ef34
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * File utilities for tests
+ */
+public class TestFileUtils
+{
+ /**
+ * Writes random data to a file with name {@code filename} under the
specified {@code directory} with
+ * the specified size in bytes.
+ *
+ * @param directory the directory where to
+ * @param fileName the name of the desired file to create
+ * @param sizeInBytes the size of the files in bytes
+ * @return the path of the file that was recently created
+ * @throws IOException when file creation or writing to the file fails
+ */
+ public static Path prepareTestFile(Path directory, String fileName, long
sizeInBytes) throws IOException
+ {
+ Path filePath = directory.resolve(fileName);
+ Files.deleteIfExists(filePath);
+
+ byte[] buffer = new byte[1024];
+ try (OutputStream outputStream = Files.newOutputStream(filePath))
+ {
+ int written = 0;
+ while (written < sizeInBytes)
+ {
+ ThreadLocalRandom.current().nextBytes(buffer);
+ int toWrite = (int) Math.min(buffer.length, sizeInBytes -
written);
+ outputStream.write(buffer, 0, toWrite);
+ written += toWrite;
+ }
+ }
+ return filePath;
+ }
+}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
b/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
new file mode 100644
index 0000000..ff7566e
--- /dev/null
+++
b/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
+import org.assertj.core.api.InstanceOfAssertFactories;
+
+import static org.apache.cassandra.sidecar.restore.RestoreJobUtil.checksum;
+import static org.assertj.core.api.Assertions.as;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.from;
+
+/**
+ * Unit tests for {@link XXHash32DigestVerifier}
+ */
+class XXHash32DigestVerifierTest
+{
+ static Vertx vertx;
+
+ @TempDir
+ static Path tempDir;
+ static Path randomFilePath;
+
+ @BeforeAll
+ static void setup() throws IOException
+ {
+ vertx = Vertx.vertx();
+
+ randomFilePath = TestFileUtils.prepareTestFile(tempDir,
"random-file.txt", 1024);
+ }
+
+ @Test
+ void failsWhenDigestIsNull()
+ {
+ assertThatNullPointerException().isThrownBy(() ->
newVerifier(null)).withMessage("digest is required");
+ }
+
+ @Test
+ void testFileDescriptorsClosedWithValidDigest() throws IOException,
InterruptedException
+ {
+ XXHash32Digest digest = new
XXHash32Digest(checksum(randomFilePath.toFile()));
+ runTestScenario(randomFilePath, digest, false);
+ }
+
+ @Test
+ void failsWithNonDefaultSeedAndSeedIsNotPassedAsAnOption() throws
IOException, InterruptedException
+ {
+ XXHash32Digest digest = new
XXHash32Digest(checksum(randomFilePath.toFile(), 0x55555555));
+ runTestScenario(randomFilePath, digest, true);
+ }
+
+ @Test
+ void testWithCustomSeed() throws IOException, InterruptedException
+ {
+ int seed = 0x55555555;
+ XXHash32Digest digest = new
XXHash32Digest(checksum(randomFilePath.toFile(), seed), seed);
+ runTestScenario(randomFilePath, digest, false);
+ }
+
+ @Test
+ void testFileDescriptorsClosedWithInvalidDigest() throws
InterruptedException
+ {
+ runTestScenario(randomFilePath, new XXHash32Digest("invalid"), true);
+ }
+
+ private void runTestScenario(Path filePath, XXHash32Digest digest,
+ boolean errorExpectedDuringValidation) throws
InterruptedException
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ ExposeAsyncFileXXHash32DigestVerifier verifier = newVerifier(digest);
+ verifier.verify(filePath.toAbsolutePath().toString())
+ .onComplete(complete -> {
+ if (errorExpectedDuringValidation)
+ {
+ assertThat(complete.failed()).isTrue();
+ assertThat(complete.cause())
+ .isInstanceOf(HttpException.class)
+ .extracting(from(t -> ((HttpException)
t).getPayload()), as(InstanceOfAssertFactories.STRING))
+ .contains("Digest mismatch. expected_digest=" +
digest.value());
+ }
+ else
+ {
+ assertThat(complete.failed()).isFalse();
+
assertThat(complete.result()).endsWith("random-file.txt");
+ }
+ latch.countDown();
+ });
+
+ assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
+
+ assertThat(verifier.file).isNotNull();
+ // we can't close the file if it's already closed, so we expect the
exception here
+ assertThatThrownBy(() -> verifier.file.end())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("File handle is closed");
+ }
+
+ static ExposeAsyncFileXXHash32DigestVerifier newVerifier(XXHash32Digest
digest)
+ {
+ return new ExposeAsyncFileXXHash32DigestVerifier(vertx.fileSystem(),
digest);
+ }
+
+ /**
+ * Class that extends from {@link XXHash32DigestVerifier} for testing
purposes and holds a reference to the
+ * {@link AsyncFile} to ensure that the file has been closed.
+ */
+ static class ExposeAsyncFileXXHash32DigestVerifier extends
XXHash32DigestVerifier
+ {
+ AsyncFile file;
+
+ public ExposeAsyncFileXXHash32DigestVerifier(FileSystem fs,
XXHash32Digest digest)
+ {
+ super(fs, digest);
+ }
+
+ @Override
+ protected Future<String> calculateDigest(AsyncFile file)
+ {
+ this.file = file;
+ return super.calculateDigest(file);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]