This is an automated email from the ASF dual-hosted git repository.
crepererum pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 6f1c9ac Add CRC64NVME checksum support (#633)
6f1c9ac is described below
commit 6f1c9acc94636e059a2b3aef5c75e815e065cc89
Author: Koen Denecker <[email protected]>
AuthorDate: Mon May 11 17:56:39 2026 +0200
Add CRC64NVME checksum support (#633)
* Add CRC64NVME checksum support
* make enum non-exhaustive and more
* fixup test bucket syntax
* simplify with_payload SHA256 logic
* avoid test interference
* do not strip checksum responses
---
Cargo.toml | 3 +-
src/aws/checksum.rs | 5 ++
src/aws/client.rs | 72 +++++++++++++++++++--------
src/aws/mod.rs | 138 +++++++++++++++++++++++++++++++++++-----------------
src/client/s3.rs | 9 ++++
5 files changed, 162 insertions(+), 65 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index b92e4d7..8ada104 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,6 +47,7 @@ walkdir = { version = "2", optional = true }
# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"],
optional = true }
+crc-fast = { version = "1.6" , optional = true }
form_urlencoded = { version = "1.2", optional = true }
http-body-util = { version = "0.1.2", optional = true }
httparse = { version = "1.8.0", default-features = false, features = ["std"],
optional = true }
@@ -85,7 +86,7 @@ cloud = ["serde", "serde_json", "quick-xml", "hyper",
"reqwest", "reqwest/stream
azure = ["cloud", "httparse"]
fs = ["walkdir", "tokio", "nix", "windows-sys"]
gcp = ["cloud", "rustls-pki-types"]
-aws = ["cloud", "md-5"]
+aws = ["cloud", "crc-fast", "md-5"]
http = ["cloud"]
tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
integration = ["rand", "tokio"]
diff --git a/src/aws/checksum.rs b/src/aws/checksum.rs
index d15bbf0..2c22081 100644
--- a/src/aws/checksum.rs
+++ b/src/aws/checksum.rs
@@ -20,16 +20,20 @@ use std::str::FromStr;
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[non_exhaustive]
/// Enum representing checksum algorithm supported by S3.
pub enum Checksum {
/// SHA-256 algorithm.
SHA256,
+ /// CRC64-NVME algorithm.
+ CRC64NVME,
}
impl std::fmt::Display for Checksum {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
Self::SHA256 => write!(f, "sha256"),
+ Self::CRC64NVME => write!(f, "crc64nvme"),
}
}
}
@@ -40,6 +44,7 @@ impl FromStr for Checksum {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"sha256" => Ok(Self::SHA256),
+ "crc64nvme" => Ok(Self::CRC64NVME),
_ => Err(()),
}
}
diff --git a/src/aws/client.rs b/src/aws/client.rs
index cd2f7bb..6716bb0 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -59,6 +59,7 @@ use std::sync::Arc;
const VERSION_HEADER: &str = "x-amz-version-id";
const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
+const CRC64NVME_CHECKSUM: &str = "x-amz-checksum-crc64nvme";
const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
const ALGORITHM: &str = "x-amz-checksum-algorithm";
const STORAGE_CLASS: &str = "x-amz-storage-class";
@@ -398,19 +399,38 @@ impl Request<'_> {
}
pub(crate) fn with_payload(mut self, payload: PutPayload) -> Self {
- if (!self.config.skip_signature && self.config.sign_payload)
- || self.config.checksum.is_some()
- {
+ use std::cell::LazyCell;
+
+ let sha256_digest = LazyCell::new(|| {
let mut sha256 = Context::new(&digest::SHA256);
- payload.iter().for_each(|x| sha256.update(x));
- let payload_sha256 = sha256.finish();
+ for part in &payload {
+ sha256.update(part);
+ }
+ sha256.finish()
+ });
- if let Some(Checksum::SHA256) = self.config.checksum {
+ if !self.config.skip_signature && self.config.sign_payload {
+ self.payload_sha256 = Some(*sha256_digest);
+ }
+
+ match self.config.checksum {
+ Some(Checksum::SHA256) => {
self.builder = self
.builder
- .header(SHA256_CHECKSUM,
BASE64_STANDARD.encode(payload_sha256));
+ .header(SHA256_CHECKSUM,
BASE64_STANDARD.encode(*sha256_digest));
+ }
+ Some(Checksum::CRC64NVME) => {
+ let crc_algo = crc_fast::CrcAlgorithm::Crc64Nvme;
+ let mut digest = crc_fast::Digest::new(crc_algo);
+ payload.iter().for_each(|x| digest.update(x));
+ let checksum = digest.finalize();
+
+ self.builder = self.builder.header(
+ CRC64NVME_CHECKSUM,
+ BASE64_STANDARD.encode(checksum.to_be_bytes()),
+ )
}
- self.payload_sha256 = Some(payload_sha256);
+ None => {}
}
let content_length = payload.content_length();
@@ -658,6 +678,9 @@ impl S3Client {
Checksum::SHA256 => {
request = request.header(ALGORITHM, "SHA256");
}
+ Checksum::CRC64NVME => {
+ request = request.header(ALGORITHM, "CRC64NVME");
+ }
}
}
let response = request
@@ -715,14 +738,18 @@ impl S3Client {
}
let (parts, body) = request.send().await?.into_parts();
- let (e_tag, checksum_sha256) = if is_copy {
+ let (e_tag, checksum_sha256, checksum_crc64nvme) = if is_copy {
let response = body
.bytes()
.await
.map_err(|source| Error::CreateMultipartResponseBody { source
})?;
let response: CopyPartResult =
quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;
- (response.e_tag, response.checksum_sha256)
+ (
+ response.e_tag,
+ response.checksum_sha256,
+ response.checksum_crc64nvme,
+ )
} else {
let e_tag = get_etag(&parts.headers).map_err(|source|
Error::Metadata { source })?;
let checksum_sha256 = parts
@@ -730,17 +757,24 @@ impl S3Client {
.get(SHA256_CHECKSUM)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());
- (e_tag, checksum_sha256)
+ let checksum_crc64nvme = parts
+ .headers
+ .get(CRC64NVME_CHECKSUM)
+ .and_then(|v| v.to_str().ok())
+ .map(|v| v.to_string());
+ (e_tag, checksum_sha256, checksum_crc64nvme)
};
- let content_id = if self.config.checksum == Some(Checksum::SHA256) {
- let meta = PartMetadata {
- e_tag,
- checksum_sha256,
- };
- quick_xml::se::to_string(&meta).unwrap()
- } else {
- e_tag
+ let content_id = match self.config.checksum {
+ Some(_) => {
+ let meta = PartMetadata {
+ e_tag,
+ checksum_sha256,
+ checksum_crc64nvme,
+ };
+ quick_xml::se::to_string(&meta).unwrap()
+ }
+ None => e_tag,
};
Ok(PartId { content_id })
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index b3804e2..935c653 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -531,30 +531,32 @@ mod tests {
maybe_skip_integration!();
let bucket = "test-bucket-for-checksum";
- let store = AmazonS3Builder::from_env()
- .with_bucket_name(bucket)
- .with_checksum_algorithm(Checksum::SHA256)
- .build()
- .unwrap();
+ for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
+ let store = AmazonS3Builder::from_env()
+ .with_bucket_name(bucket)
+ .with_checksum_algorithm(checksum)
+ .build()
+ .unwrap();
- let str = "test.bin";
- let path = Path::parse(str).unwrap();
- let opts = PutMultipartOptions::default();
- let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
+ let str = "test.bin";
+ let path = Path::parse(str).unwrap();
+ let opts = PutMultipartOptions::default();
+ let mut upload = store.put_multipart_opts(&path,
opts).await.unwrap();
- upload
- .put_part(PutPayload::from(vec![0u8; 10_000_000]))
- .await
- .unwrap();
- upload
- .put_part(PutPayload::from(vec![0u8; 5_000_000]))
- .await
- .unwrap();
+ upload
+ .put_part(PutPayload::from(vec![0u8; 10_000_000]))
+ .await
+ .unwrap();
+ upload
+ .put_part(PutPayload::from(vec![0u8; 5_000_000]))
+ .await
+ .unwrap();
- let res = upload.complete().await.unwrap();
- assert!(res.e_tag.is_some(), "Should have valid etag");
+ let res = upload.complete().await.unwrap();
+ assert!(res.e_tag.is_some(), "Should have valid etag");
- store.delete(&path).await.unwrap();
+ store.delete(&path).await.unwrap();
+ }
}
#[tokio::test]
@@ -562,15 +564,46 @@ mod tests {
maybe_skip_integration!();
let bucket = "test-bucket-for-copy-if-not-exists";
+ for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
+ let store = AmazonS3Builder::from_env()
+ .with_bucket_name(bucket)
+ .with_checksum_algorithm(checksum)
+ .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
+ .build()
+ .unwrap();
+
+ let src = Path::parse("src.bin").unwrap();
+ let dst = Path::parse("dst.bin").unwrap();
+ store
+ .put(&src, PutPayload::from(vec![0u8; 100_000]))
+ .await
+ .unwrap();
+ if store.head(&dst).await.is_ok() {
+ store.delete(&dst).await.unwrap();
+ }
+ store.copy_if_not_exists(&src, &dst).await.unwrap();
+ store.delete(&src).await.unwrap();
+ store.delete(&dst).await.unwrap();
+ }
+ }
+
+ #[tokio::test]
+ async fn copy_multipart_file_with_signature_change_checksum() {
+ maybe_skip_integration!();
+
+ let bucket = "test-bucket-for-copy-if-not-exists";
+ let checksum_src = Checksum::SHA256;
+ let checksum_dst = Checksum::CRC64NVME;
+
+ let src = Path::parse("change_checksum_src.bin").unwrap();
+ let dst = Path::parse("change_checksum_dst.bin").unwrap();
+
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
- .with_checksum_algorithm(Checksum::SHA256)
- .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
+ .with_checksum_algorithm(checksum_src)
.build()
.unwrap();
- let src = Path::parse("src.bin").unwrap();
- let dst = Path::parse("dst.bin").unwrap();
store
.put(&src, PutPayload::from(vec![0u8; 100_000]))
.await
@@ -578,6 +611,14 @@ mod tests {
if store.head(&dst).await.is_ok() {
store.delete(&dst).await.unwrap();
}
+
+ let store = AmazonS3Builder::from_env()
+ .with_bucket_name(bucket)
+ .with_checksum_algorithm(checksum_dst)
+ .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
+ .build()
+ .unwrap();
+
store.copy_if_not_exists(&src, &dst).await.unwrap();
store.delete(&src).await.unwrap();
store.delete(&dst).await.unwrap();
@@ -587,31 +628,33 @@ mod tests {
async fn write_multipart_file_with_signature_object_lock() {
maybe_skip_integration!();
- let bucket = "test-object-lock";
- let store = AmazonS3Builder::from_env()
- .with_bucket_name(bucket)
- .with_checksum_algorithm(Checksum::SHA256)
- .build()
- .unwrap();
+ for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
+ let bucket = "test-object-lock";
+ let store = AmazonS3Builder::from_env()
+ .with_bucket_name(bucket)
+ .with_checksum_algorithm(checksum)
+ .build()
+ .unwrap();
- let str = "test.bin";
- let path = Path::parse(str).unwrap();
- let opts = PutMultipartOptions::default();
- let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
+ let str = "test.bin";
+ let path = Path::parse(str).unwrap();
+ let opts = PutMultipartOptions::default();
+ let mut upload = store.put_multipart_opts(&path,
opts).await.unwrap();
- upload
- .put_part(PutPayload::from(vec![0u8; 10_000_000]))
- .await
- .unwrap();
- upload
- .put_part(PutPayload::from(vec![0u8; 5_000_000]))
- .await
- .unwrap();
+ upload
+ .put_part(PutPayload::from(vec![0u8; 10_000_000]))
+ .await
+ .unwrap();
+ upload
+ .put_part(PutPayload::from(vec![0u8; 5_000_000]))
+ .await
+ .unwrap();
- let res = upload.complete().await.unwrap();
- assert!(res.e_tag.is_some(), "Should have valid etag");
+ let res = upload.complete().await.unwrap();
+ assert!(res.e_tag.is_some(), "Should have valid etag");
- store.delete(&path).await.unwrap();
+ store.delete(&path).await.unwrap();
+ }
}
#[tokio::test]
@@ -671,6 +714,11 @@ mod tests {
let builder =
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
let integration = builder.build().unwrap();
put_get_delete_list(&integration).await;
+
+ // run integration test with checksum set to crc64nvme
+ let builder =
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::CRC64NVME);
+ let integration = builder.build().unwrap();
+ put_get_delete_list(&integration).await;
}
#[tokio::test]
diff --git a/src/client/s3.rs b/src/client/s3.rs
index a1b113e..2beec61 100644
--- a/src/client/s3.rs
+++ b/src/client/s3.rs
@@ -100,6 +100,8 @@ pub(crate) struct CopyPartResult {
pub e_tag: String,
#[serde(default, rename = "ChecksumSHA256")]
pub checksum_sha256: Option<String>,
+ #[serde(default, rename = "ChecksumCRC64NVME")]
+ pub checksum_crc64nvme: Option<String>,
}
#[derive(Debug, Serialize)]
@@ -113,6 +115,8 @@ pub(crate) struct PartMetadata {
pub e_tag: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_sha256: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub checksum_crc64nvme: Option<String>,
}
impl From<Vec<PartId>> for CompleteMultipartUpload {
@@ -127,12 +131,14 @@ impl From<Vec<PartId>> for CompleteMultipartUpload {
Err(_) => PartMetadata {
e_tag: part.content_id.clone(),
checksum_sha256: None,
+ checksum_crc64nvme: None,
},
};
MultipartPart {
e_tag: md.e_tag,
part_number: part_idx + 1,
checksum_sha256: md.checksum_sha256,
+ checksum_crc64nvme: md.checksum_crc64nvme,
}
})
.collect();
@@ -149,6 +155,9 @@ pub(crate) struct MultipartPart {
#[serde(rename = "ChecksumSHA256")]
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_sha256: Option<String>,
+ #[serde(rename = "ChecksumCRC64NVME")]
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub checksum_crc64nvme: Option<String>,
}
#[derive(Debug, Deserialize)]