This is an automated email from the ASF dual-hosted git repository.

crepererum pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f1c9ac  Add CRC64NVME checksum support (#633)
6f1c9ac is described below

commit 6f1c9acc94636e059a2b3aef5c75e815e065cc89
Author: Koen Denecker <[email protected]>
AuthorDate: Mon May 11 17:56:39 2026 +0200

    Add CRC64NVME checksum support (#633)
    
    * Add CRC64NVME checksum support
    
    * make enum non-exhaustive and more
    
    * fixup test bucket syntax
    
    * simplify with_payload SHA256 logic
    
    * avoid test interference
    
    * do not strip checksum responses
---
 Cargo.toml          |   3 +-
 src/aws/checksum.rs |   5 ++
 src/aws/client.rs   |  72 +++++++++++++++++++--------
 src/aws/mod.rs      | 138 +++++++++++++++++++++++++++++++++++-----------------
 src/client/s3.rs    |   9 ++++
 5 files changed, 162 insertions(+), 65 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index b92e4d7..8ada104 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,6 +47,7 @@ walkdir = { version = "2", optional = true }
 
 # Cloud storage support
 base64 = { version = "0.22", default-features = false, features = ["std"], 
optional = true }
+crc-fast = { version = "1.6" , optional = true }
 form_urlencoded = { version = "1.2", optional = true }
 http-body-util = { version = "0.1.2", optional = true }
 httparse = { version = "1.8.0", default-features = false, features = ["std"], 
optional = true }
@@ -85,7 +86,7 @@ cloud = ["serde", "serde_json", "quick-xml", "hyper", 
"reqwest", "reqwest/stream
 azure = ["cloud", "httparse"]
 fs = ["walkdir", "tokio", "nix", "windows-sys"]
 gcp = ["cloud", "rustls-pki-types"]
-aws = ["cloud", "md-5"]
+aws = ["cloud", "crc-fast", "md-5"]
 http = ["cloud"]
 tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
 integration = ["rand", "tokio"]
diff --git a/src/aws/checksum.rs b/src/aws/checksum.rs
index d15bbf0..2c22081 100644
--- a/src/aws/checksum.rs
+++ b/src/aws/checksum.rs
@@ -20,16 +20,20 @@ use std::str::FromStr;
 
 #[allow(non_camel_case_types)]
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[non_exhaustive]
 /// Enum representing checksum algorithm supported by S3.
 pub enum Checksum {
     /// SHA-256 algorithm.
     SHA256,
+    /// CRC64-NVME algorithm.
+    CRC64NVME,
 }
 
 impl std::fmt::Display for Checksum {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match &self {
             Self::SHA256 => write!(f, "sha256"),
+            Self::CRC64NVME => write!(f, "crc64nvme"),
         }
     }
 }
@@ -40,6 +44,7 @@ impl FromStr for Checksum {
     fn from_str(s: &str) -> Result<Self, Self::Err> {
         match s.to_lowercase().as_str() {
             "sha256" => Ok(Self::SHA256),
+            "crc64nvme" => Ok(Self::CRC64NVME),
             _ => Err(()),
         }
     }
diff --git a/src/aws/client.rs b/src/aws/client.rs
index cd2f7bb..6716bb0 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -59,6 +59,7 @@ use std::sync::Arc;
 
 const VERSION_HEADER: &str = "x-amz-version-id";
 const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
+const CRC64NVME_CHECKSUM: &str = "x-amz-checksum-crc64nvme";
 const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
 const ALGORITHM: &str = "x-amz-checksum-algorithm";
 const STORAGE_CLASS: &str = "x-amz-storage-class";
@@ -398,19 +399,38 @@ impl Request<'_> {
     }
 
     pub(crate) fn with_payload(mut self, payload: PutPayload) -> Self {
-        if (!self.config.skip_signature && self.config.sign_payload)
-            || self.config.checksum.is_some()
-        {
+        use std::cell::LazyCell;
+
+        let sha256_digest = LazyCell::new(|| {
             let mut sha256 = Context::new(&digest::SHA256);
-            payload.iter().for_each(|x| sha256.update(x));
-            let payload_sha256 = sha256.finish();
+            for part in &payload {
+                sha256.update(part);
+            }
+            sha256.finish()
+        });
 
-            if let Some(Checksum::SHA256) = self.config.checksum {
+        if !self.config.skip_signature && self.config.sign_payload {
+            self.payload_sha256 = Some(*sha256_digest);
+        }
+
+        match self.config.checksum {
+            Some(Checksum::SHA256) => {
                 self.builder = self
                     .builder
-                    .header(SHA256_CHECKSUM, 
BASE64_STANDARD.encode(payload_sha256));
+                    .header(SHA256_CHECKSUM, 
BASE64_STANDARD.encode(*sha256_digest));
+            }
+            Some(Checksum::CRC64NVME) => {
+                let crc_algo = crc_fast::CrcAlgorithm::Crc64Nvme;
+                let mut digest = crc_fast::Digest::new(crc_algo);
+                payload.iter().for_each(|x| digest.update(x));
+                let checksum = digest.finalize();
+
+                self.builder = self.builder.header(
+                    CRC64NVME_CHECKSUM,
+                    BASE64_STANDARD.encode(checksum.to_be_bytes()),
+                )
             }
-            self.payload_sha256 = Some(payload_sha256);
+            None => {}
         }
 
         let content_length = payload.content_length();
@@ -658,6 +678,9 @@ impl S3Client {
                 Checksum::SHA256 => {
                     request = request.header(ALGORITHM, "SHA256");
                 }
+                Checksum::CRC64NVME => {
+                    request = request.header(ALGORITHM, "CRC64NVME");
+                }
             }
         }
         let response = request
@@ -715,14 +738,18 @@ impl S3Client {
         }
 
         let (parts, body) = request.send().await?.into_parts();
-        let (e_tag, checksum_sha256) = if is_copy {
+        let (e_tag, checksum_sha256, checksum_crc64nvme) = if is_copy {
             let response = body
                 .bytes()
                 .await
                 .map_err(|source| Error::CreateMultipartResponseBody { source 
})?;
             let response: CopyPartResult = 
quick_xml::de::from_reader(response.reader())
                 .map_err(|source| Error::InvalidMultipartResponse { source })?;
-            (response.e_tag, response.checksum_sha256)
+            (
+                response.e_tag,
+                response.checksum_sha256,
+                response.checksum_crc64nvme,
+            )
         } else {
             let e_tag = get_etag(&parts.headers).map_err(|source| 
Error::Metadata { source })?;
             let checksum_sha256 = parts
@@ -730,17 +757,24 @@ impl S3Client {
                 .get(SHA256_CHECKSUM)
                 .and_then(|v| v.to_str().ok())
                 .map(|v| v.to_string());
-            (e_tag, checksum_sha256)
+            let checksum_crc64nvme = parts
+                .headers
+                .get(CRC64NVME_CHECKSUM)
+                .and_then(|v| v.to_str().ok())
+                .map(|v| v.to_string());
+            (e_tag, checksum_sha256, checksum_crc64nvme)
         };
 
-        let content_id = if self.config.checksum == Some(Checksum::SHA256) {
-            let meta = PartMetadata {
-                e_tag,
-                checksum_sha256,
-            };
-            quick_xml::se::to_string(&meta).unwrap()
-        } else {
-            e_tag
+        let content_id = match self.config.checksum {
+            Some(_) => {
+                let meta = PartMetadata {
+                    e_tag,
+                    checksum_sha256,
+                    checksum_crc64nvme,
+                };
+                quick_xml::se::to_string(&meta).unwrap()
+            }
+            None => e_tag,
         };
 
         Ok(PartId { content_id })
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index b3804e2..935c653 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -531,30 +531,32 @@ mod tests {
         maybe_skip_integration!();
 
         let bucket = "test-bucket-for-checksum";
-        let store = AmazonS3Builder::from_env()
-            .with_bucket_name(bucket)
-            .with_checksum_algorithm(Checksum::SHA256)
-            .build()
-            .unwrap();
+        for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
+            let store = AmazonS3Builder::from_env()
+                .with_bucket_name(bucket)
+                .with_checksum_algorithm(checksum)
+                .build()
+                .unwrap();
 
-        let str = "test.bin";
-        let path = Path::parse(str).unwrap();
-        let opts = PutMultipartOptions::default();
-        let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
+            let str = "test.bin";
+            let path = Path::parse(str).unwrap();
+            let opts = PutMultipartOptions::default();
+            let mut upload = store.put_multipart_opts(&path, 
opts).await.unwrap();
 
-        upload
-            .put_part(PutPayload::from(vec![0u8; 10_000_000]))
-            .await
-            .unwrap();
-        upload
-            .put_part(PutPayload::from(vec![0u8; 5_000_000]))
-            .await
-            .unwrap();
+            upload
+                .put_part(PutPayload::from(vec![0u8; 10_000_000]))
+                .await
+                .unwrap();
+            upload
+                .put_part(PutPayload::from(vec![0u8; 5_000_000]))
+                .await
+                .unwrap();
 
-        let res = upload.complete().await.unwrap();
-        assert!(res.e_tag.is_some(), "Should have valid etag");
+            let res = upload.complete().await.unwrap();
+            assert!(res.e_tag.is_some(), "Should have valid etag");
 
-        store.delete(&path).await.unwrap();
+            store.delete(&path).await.unwrap();
+        }
     }
 
     #[tokio::test]
@@ -562,15 +564,46 @@ mod tests {
         maybe_skip_integration!();
 
         let bucket = "test-bucket-for-copy-if-not-exists";
+        for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
+            let store = AmazonS3Builder::from_env()
+                .with_bucket_name(bucket)
+                .with_checksum_algorithm(checksum)
+                .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
+                .build()
+                .unwrap();
+
+            let src = Path::parse("src.bin").unwrap();
+            let dst = Path::parse("dst.bin").unwrap();
+            store
+                .put(&src, PutPayload::from(vec![0u8; 100_000]))
+                .await
+                .unwrap();
+            if store.head(&dst).await.is_ok() {
+                store.delete(&dst).await.unwrap();
+            }
+            store.copy_if_not_exists(&src, &dst).await.unwrap();
+            store.delete(&src).await.unwrap();
+            store.delete(&dst).await.unwrap();
+        }
+    }
+
+    #[tokio::test]
+    async fn copy_multipart_file_with_signature_change_checksum() {
+        maybe_skip_integration!();
+
+        let bucket = "test-bucket-for-copy-if-not-exists";
+        let checksum_src = Checksum::SHA256;
+        let checksum_dst = Checksum::CRC64NVME;
+
+        let src = Path::parse("change_checksum_src.bin").unwrap();
+        let dst = Path::parse("change_checksum_dst.bin").unwrap();
+
         let store = AmazonS3Builder::from_env()
             .with_bucket_name(bucket)
-            .with_checksum_algorithm(Checksum::SHA256)
-            .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
+            .with_checksum_algorithm(checksum_src)
             .build()
             .unwrap();
 
-        let src = Path::parse("src.bin").unwrap();
-        let dst = Path::parse("dst.bin").unwrap();
         store
             .put(&src, PutPayload::from(vec![0u8; 100_000]))
             .await
@@ -578,6 +611,14 @@ mod tests {
         if store.head(&dst).await.is_ok() {
             store.delete(&dst).await.unwrap();
         }
+
+        let store = AmazonS3Builder::from_env()
+            .with_bucket_name(bucket)
+            .with_checksum_algorithm(checksum_dst)
+            .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
+            .build()
+            .unwrap();
+
         store.copy_if_not_exists(&src, &dst).await.unwrap();
         store.delete(&src).await.unwrap();
         store.delete(&dst).await.unwrap();
@@ -587,31 +628,33 @@ mod tests {
     async fn write_multipart_file_with_signature_object_lock() {
         maybe_skip_integration!();
 
-        let bucket = "test-object-lock";
-        let store = AmazonS3Builder::from_env()
-            .with_bucket_name(bucket)
-            .with_checksum_algorithm(Checksum::SHA256)
-            .build()
-            .unwrap();
+        for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
+            let bucket = "test-object-lock";
+            let store = AmazonS3Builder::from_env()
+                .with_bucket_name(bucket)
+                .with_checksum_algorithm(checksum)
+                .build()
+                .unwrap();
 
-        let str = "test.bin";
-        let path = Path::parse(str).unwrap();
-        let opts = PutMultipartOptions::default();
-        let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
+            let str = "test.bin";
+            let path = Path::parse(str).unwrap();
+            let opts = PutMultipartOptions::default();
+            let mut upload = store.put_multipart_opts(&path, 
opts).await.unwrap();
 
-        upload
-            .put_part(PutPayload::from(vec![0u8; 10_000_000]))
-            .await
-            .unwrap();
-        upload
-            .put_part(PutPayload::from(vec![0u8; 5_000_000]))
-            .await
-            .unwrap();
+            upload
+                .put_part(PutPayload::from(vec![0u8; 10_000_000]))
+                .await
+                .unwrap();
+            upload
+                .put_part(PutPayload::from(vec![0u8; 5_000_000]))
+                .await
+                .unwrap();
 
-        let res = upload.complete().await.unwrap();
-        assert!(res.e_tag.is_some(), "Should have valid etag");
+            let res = upload.complete().await.unwrap();
+            assert!(res.e_tag.is_some(), "Should have valid etag");
 
-        store.delete(&path).await.unwrap();
+            store.delete(&path).await.unwrap();
+        }
     }
 
     #[tokio::test]
@@ -671,6 +714,11 @@ mod tests {
         let builder = 
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
         let integration = builder.build().unwrap();
         put_get_delete_list(&integration).await;
+
+        // run integration test with checksum set to crc64nvme
+        let builder = 
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::CRC64NVME);
+        let integration = builder.build().unwrap();
+        put_get_delete_list(&integration).await;
     }
 
     #[tokio::test]
diff --git a/src/client/s3.rs b/src/client/s3.rs
index a1b113e..2beec61 100644
--- a/src/client/s3.rs
+++ b/src/client/s3.rs
@@ -100,6 +100,8 @@ pub(crate) struct CopyPartResult {
     pub e_tag: String,
     #[serde(default, rename = "ChecksumSHA256")]
     pub checksum_sha256: Option<String>,
+    #[serde(default, rename = "ChecksumCRC64NVME")]
+    pub checksum_crc64nvme: Option<String>,
 }
 
 #[derive(Debug, Serialize)]
@@ -113,6 +115,8 @@ pub(crate) struct PartMetadata {
     pub e_tag: String,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub checksum_sha256: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub checksum_crc64nvme: Option<String>,
 }
 
 impl From<Vec<PartId>> for CompleteMultipartUpload {
@@ -127,12 +131,14 @@ impl From<Vec<PartId>> for CompleteMultipartUpload {
                     Err(_) => PartMetadata {
                         e_tag: part.content_id.clone(),
                         checksum_sha256: None,
+                        checksum_crc64nvme: None,
                     },
                 };
                 MultipartPart {
                     e_tag: md.e_tag,
                     part_number: part_idx + 1,
                     checksum_sha256: md.checksum_sha256,
+                    checksum_crc64nvme: md.checksum_crc64nvme,
                 }
             })
             .collect();
@@ -149,6 +155,9 @@ pub(crate) struct MultipartPart {
     #[serde(rename = "ChecksumSHA256")]
     #[serde(skip_serializing_if = "Option::is_none")]
     pub checksum_sha256: Option<String>,
+    #[serde(rename = "ChecksumCRC64NVME")]
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub checksum_crc64nvme: Option<String>,
 }
 
 #[derive(Debug, Deserialize)]

Reply via email to