This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 28bd4bc79 feat: Implement CRC32C algorithm (#3822)
28bd4bc79 is described below

commit 28bd4bc79609be75ef200b470f9f756e3516e11a
Author: Steve Vaughan <[email protected]>
AuthorDate: Mon Mar 30 18:05:04 2026 -0400

    feat: Implement CRC32C algorithm (#3822)
    
    Co-authored-by: Steve Vaughan Jr <[email protected]>
    Co-authored-by: Matt Butrovich <[email protected]>
---
 native/Cargo.lock                                  |  1 +
 native/shuffle/Cargo.toml                          |  1 +
 native/shuffle/src/lib.rs                          |  1 +
 native/shuffle/src/spark_crc32c_hasher.rs          | 84 ++++++++++++++++++++++
 native/shuffle/src/writers/checksum.rs             | 77 ++++++++++++++++++--
 .../sql/comet/execution/shuffle/SpillWriter.java   |  4 +-
 6 files changed, 163 insertions(+), 5 deletions(-)

diff --git a/native/Cargo.lock b/native/Cargo.lock
index 598b18d58..0cf1f2031 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -1953,6 +1953,7 @@ dependencies = [
  "arrow",
  "async-trait",
  "bytes",
+ "crc32c",
  "crc32fast",
  "criterion",
  "datafusion",
diff --git a/native/shuffle/Cargo.toml b/native/shuffle/Cargo.toml
index 94ed5f30a..5cd7cd43e 100644
--- a/native/shuffle/Cargo.toml
+++ b/native/shuffle/Cargo.toml
@@ -32,6 +32,7 @@ publish = false
 arrow = { workspace = true }
 async-trait = { workspace = true }
 bytes = { workspace = true }
+crc32c = "0.6.8"
 crc32fast = "1.3.2"
 datafusion = { workspace = true }
 datafusion-comet-common = { workspace = true }
diff --git a/native/shuffle/src/lib.rs b/native/shuffle/src/lib.rs
index f29588f2e..dd3b90027 100644
--- a/native/shuffle/src/lib.rs
+++ b/native/shuffle/src/lib.rs
@@ -20,6 +20,7 @@ pub mod ipc;
 pub(crate) mod metrics;
 pub(crate) mod partitioners;
 mod shuffle_writer;
+mod spark_crc32c_hasher;
 pub mod spark_unsafe;
 pub(crate) mod writers;
 
diff --git a/native/shuffle/src/spark_crc32c_hasher.rs 
b/native/shuffle/src/spark_crc32c_hasher.rs
new file mode 100644
index 000000000..82d61132b
--- /dev/null
+++ b/native/shuffle/src/spark_crc32c_hasher.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Provide a CRC-32C implementor of [Hasher].
+use std::hash::Hasher;
+
+use crc32c::crc32c_append;
+
+/// Implementor of [Hasher] for CRC-32C.
+///
+/// Note that CRC-32C produces a 32-bit hash (as [u32]),
+/// but the trait requires that the output value be [u64].
+///
+/// This implementation is necessary because the existing [Hasher] 
implementation does not support
+/// [Clone].
+#[derive(Default, Clone)]
+pub struct SparkCrc32cHasher {
+    checksum: u32,
+}
+
+impl SparkCrc32cHasher {
+    /// Create the [Hasher] pre-loaded with a particular checksum.
+    ///
+    /// Use the [Default::default()] constructor for a clean start.
+    pub fn new(initial: u32) -> Self {
+        Self { checksum: initial }
+    }
+
+    pub fn finalize(&self) -> u32 {
+        self.checksum
+    }
+}
+
+impl Hasher for SparkCrc32cHasher {
+    fn finish(&self) -> u64 {
+        self.checksum as u64
+    }
+
+    fn write(&mut self, bytes: &[u8]) {
+        self.checksum = crc32c_append(self.checksum, bytes);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    const TEST_STRING: &[u8] =
+        b"This is a very long string which is used to test the 
CRC-32-Castagnoli function.";
+    const CHECKSUM: u32 = 0x20_CB_1E_59;
+
+    #[test]
+    fn can_hash() {
+        let mut hasher = SparkCrc32cHasher::default();
+        hasher.write(TEST_STRING);
+        assert_eq!(hasher.finish(), CHECKSUM as u64);
+    }
+
+    /// Demonstrate writing in multiple chunks by splitting the [TEST_STRING] 
and getting the same
+    /// [CHECKSUM].
+    #[test]
+    fn can_hash_in_chunks() {
+        let (head, tail) = TEST_STRING.split_at(20);
+
+        let mut hasher = SparkCrc32cHasher::default();
+        hasher.write(head);
+        hasher.write(tail);
+        assert_eq!(hasher.finish(), CHECKSUM as u64);
+    }
+}
diff --git a/native/shuffle/src/writers/checksum.rs 
b/native/shuffle/src/writers/checksum.rs
index b240302e6..1dfd15eb3 100644
--- a/native/shuffle/src/writers/checksum.rs
+++ b/native/shuffle/src/writers/checksum.rs
@@ -15,19 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::spark_crc32c_hasher::SparkCrc32cHasher;
 use bytes::Buf;
-use crc32fast::Hasher;
 use datafusion_comet_jni_bridge::errors::{CometError, CometResult};
 use simd_adler32::Adler32;
+use std::default::Default;
+use std::hash::Hasher;
 use std::io::{Cursor, SeekFrom};
 
 /// Checksum algorithms for writing IPC bytes.
 #[derive(Clone)]
 pub(crate) enum Checksum {
     /// CRC32 checksum algorithm.
-    CRC32(Hasher),
+    CRC32(crc32fast::Hasher),
     /// Adler32 checksum algorithm.
     Adler32(Adler32),
+    /// CRC32C checksum algorithm.
+    CRC32C(SparkCrc32cHasher),
 }
 
 impl Checksum {
@@ -35,9 +39,9 @@ impl Checksum {
         match algo {
             0 => {
                 let hasher = if let Some(initial) = initial_opt {
-                    Hasher::new_with_initial(initial)
+                    crc32fast::Hasher::new_with_initial(initial)
                 } else {
-                    Hasher::new()
+                    crc32fast::Hasher::new()
                 };
                 Ok(Checksum::CRC32(hasher))
             }
@@ -51,6 +55,14 @@ impl Checksum {
                 };
                 Ok(Checksum::Adler32(hasher))
             }
+            2 => {
+                let hasher = if let Some(initial) = initial_opt {
+                    SparkCrc32cHasher::new(initial)
+                } else {
+                    Default::default()
+                };
+                Ok(Checksum::CRC32C(hasher))
+            }
             _ => Err(CometError::Internal(
                 "Unsupported checksum algorithm".to_string(),
             )),
@@ -69,6 +81,11 @@ impl Checksum {
                 hasher.write(cursor.chunk());
                 Ok(())
             }
+            Checksum::CRC32C(hasher) => {
+                std::io::Seek::seek(cursor, SeekFrom::Start(0))?;
+                hasher.write(cursor.chunk());
+                Ok(())
+            }
         }
     }
 
@@ -76,6 +93,58 @@ impl Checksum {
         match self {
             Checksum::CRC32(hasher) => hasher.finalize(),
             Checksum::Adler32(hasher) => hasher.finish(),
+            Checksum::CRC32C(hasher) => hasher.finalize(),
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::io::Cursor;
+
+    #[test]
+    fn test_crc32() {
+        let mut checksum = Checksum::try_new(0, None).unwrap();
+        let message = b"123456789";
+
+        let mut vector: Vec<u8> = message.to_vec();
+        let mut buff = Cursor::new(&mut vector);
+
+        checksum.update(&mut buff).unwrap();
+        let result = checksum.finalize();
+
+        let expected_crc = 0xcbf43926u32;
+        assert_eq!(result, expected_crc)
+    }
+
+    #[test]
+    fn test_adler32() {
+        let mut checksum = Checksum::try_new(1, None).unwrap();
+        let message = b"123456789";
+
+        let mut vector: Vec<u8> = message.to_vec();
+        let mut buff = Cursor::new(&mut vector);
+
+        checksum.update(&mut buff).unwrap();
+        let result = checksum.finalize();
+
+        let expected_crc = 0x091e01deu32;
+        assert_eq!(result, expected_crc)
+    }
+
+    #[test]
+    fn test_crc32c() {
+        let mut checksum = Checksum::try_new(2, None).unwrap();
+        let message = b"123456789";
+
+        let mut vector: Vec<u8> = message.to_vec();
+        let mut buff = Cursor::new(&mut vector);
+
+        checksum.update(&mut buff).unwrap();
+        let result = checksum.finalize();
+
+        let expected_crc = 0xe3069283u32;
+        assert_eq!(result, expected_crc)
+    }
+}
diff --git 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java
 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java
index 044c7842f..fd3ff65a0 100644
--- 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java
+++ 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java
@@ -68,7 +68,7 @@ public abstract class SpillWriter {
 
   protected byte[][] dataTypes;
 
-  // 0: CRC32, 1: Adler32. Spark uses Adler32 by default.
+  // 0: CRC32, 1: Adler32, or 2: CRC32C. Spark uses Adler32 by default.
   protected int checksumAlgo = 1;
   protected long checksum = -1;
 
@@ -98,6 +98,8 @@ public abstract class SpillWriter {
       this.checksumAlgo = 0;
     } else if (algo.equals("adler32")) {
       this.checksumAlgo = 1;
+    } else if (algo.equals("crc32c")) {
+      this.checksumAlgo = 2;
     } else {
       throw new UnsupportedOperationException(
           "Unsupported shuffle checksum algorithm: " + checksumAlgo);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to