This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 28bd4bc79 feat: Implement CRC32C algorithm (#3822)
28bd4bc79 is described below
commit 28bd4bc79609be75ef200b470f9f756e3516e11a
Author: Steve Vaughan <[email protected]>
AuthorDate: Mon Mar 30 18:05:04 2026 -0400
feat: Implement CRC32C algorithm (#3822)
Co-authored-by: Steve Vaughan Jr <[email protected]>
Co-authored-by: Matt Butrovich <[email protected]>
---
native/Cargo.lock | 1 +
native/shuffle/Cargo.toml | 1 +
native/shuffle/src/lib.rs | 1 +
native/shuffle/src/spark_crc32c_hasher.rs | 84 ++++++++++++++++++++++
native/shuffle/src/writers/checksum.rs | 77 ++++++++++++++++++--
.../sql/comet/execution/shuffle/SpillWriter.java | 4 +-
6 files changed, 163 insertions(+), 5 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 598b18d58..0cf1f2031 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -1953,6 +1953,7 @@ dependencies = [
"arrow",
"async-trait",
"bytes",
+ "crc32c",
"crc32fast",
"criterion",
"datafusion",
diff --git a/native/shuffle/Cargo.toml b/native/shuffle/Cargo.toml
index 94ed5f30a..5cd7cd43e 100644
--- a/native/shuffle/Cargo.toml
+++ b/native/shuffle/Cargo.toml
@@ -32,6 +32,7 @@ publish = false
arrow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
+crc32c = "0.6.8"
crc32fast = "1.3.2"
datafusion = { workspace = true }
datafusion-comet-common = { workspace = true }
diff --git a/native/shuffle/src/lib.rs b/native/shuffle/src/lib.rs
index f29588f2e..dd3b90027 100644
--- a/native/shuffle/src/lib.rs
+++ b/native/shuffle/src/lib.rs
@@ -20,6 +20,7 @@ pub mod ipc;
pub(crate) mod metrics;
pub(crate) mod partitioners;
mod shuffle_writer;
+mod spark_crc32c_hasher;
pub mod spark_unsafe;
pub(crate) mod writers;
diff --git a/native/shuffle/src/spark_crc32c_hasher.rs
b/native/shuffle/src/spark_crc32c_hasher.rs
new file mode 100644
index 000000000..82d61132b
--- /dev/null
+++ b/native/shuffle/src/spark_crc32c_hasher.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Provide a CRC-32C implementor of [Hasher].
+use std::hash::Hasher;
+
+use crc32c::crc32c_append;
+
+/// Implementor of [Hasher] for CRC-32C.
+///
+/// Note that CRC-32C produces a 32-bit hash (as [u32]),
+/// but the trait requires that the output value be [u64].
+///
+/// This implementation is necessary because the existing [Hasher]
implementation does not support
+/// [Clone].
+#[derive(Default, Clone)]
+pub struct SparkCrc32cHasher {
+ checksum: u32,
+}
+
+impl SparkCrc32cHasher {
+ /// Create the [Hasher] pre-loaded with a particular checksum.
+ ///
+ /// Use the [Default::default()] constructor for a clean start.
+ pub fn new(initial: u32) -> Self {
+ Self { checksum: initial }
+ }
+
+ pub fn finalize(&self) -> u32 {
+ self.checksum
+ }
+}
+
+impl Hasher for SparkCrc32cHasher {
+ fn finish(&self) -> u64 {
+ self.checksum as u64
+ }
+
+ fn write(&mut self, bytes: &[u8]) {
+ self.checksum = crc32c_append(self.checksum, bytes);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ const TEST_STRING: &[u8] =
+ b"This is a very long string which is used to test the
CRC-32-Castagnoli function.";
+ const CHECKSUM: u32 = 0x20_CB_1E_59;
+
+ #[test]
+ fn can_hash() {
+ let mut hasher = SparkCrc32cHasher::default();
+ hasher.write(TEST_STRING);
+ assert_eq!(hasher.finish(), CHECKSUM as u64);
+ }
+
+ /// Demonstrate writing in multiple chunks by splitting the [TEST_STRING]
and getting the same
+ /// [CHECKSUM].
+ #[test]
+ fn can_hash_in_chunks() {
+ let (head, tail) = TEST_STRING.split_at(20);
+
+ let mut hasher = SparkCrc32cHasher::default();
+ hasher.write(head);
+ hasher.write(tail);
+ assert_eq!(hasher.finish(), CHECKSUM as u64);
+ }
+}
diff --git a/native/shuffle/src/writers/checksum.rs
b/native/shuffle/src/writers/checksum.rs
index b240302e6..1dfd15eb3 100644
--- a/native/shuffle/src/writers/checksum.rs
+++ b/native/shuffle/src/writers/checksum.rs
@@ -15,19 +15,23 @@
// specific language governing permissions and limitations
// under the License.
+use crate::spark_crc32c_hasher::SparkCrc32cHasher;
use bytes::Buf;
-use crc32fast::Hasher;
use datafusion_comet_jni_bridge::errors::{CometError, CometResult};
use simd_adler32::Adler32;
+use std::default::Default;
+use std::hash::Hasher;
use std::io::{Cursor, SeekFrom};
/// Checksum algorithms for writing IPC bytes.
#[derive(Clone)]
pub(crate) enum Checksum {
/// CRC32 checksum algorithm.
- CRC32(Hasher),
+ CRC32(crc32fast::Hasher),
/// Adler32 checksum algorithm.
Adler32(Adler32),
+ /// CRC32C checksum algorithm.
+ CRC32C(SparkCrc32cHasher),
}
impl Checksum {
@@ -35,9 +39,9 @@ impl Checksum {
match algo {
0 => {
let hasher = if let Some(initial) = initial_opt {
- Hasher::new_with_initial(initial)
+ crc32fast::Hasher::new_with_initial(initial)
} else {
- Hasher::new()
+ crc32fast::Hasher::new()
};
Ok(Checksum::CRC32(hasher))
}
@@ -51,6 +55,14 @@ impl Checksum {
};
Ok(Checksum::Adler32(hasher))
}
+ 2 => {
+ let hasher = if let Some(initial) = initial_opt {
+ SparkCrc32cHasher::new(initial)
+ } else {
+ Default::default()
+ };
+ Ok(Checksum::CRC32C(hasher))
+ }
_ => Err(CometError::Internal(
"Unsupported checksum algorithm".to_string(),
)),
@@ -69,6 +81,11 @@ impl Checksum {
hasher.write(cursor.chunk());
Ok(())
}
+ Checksum::CRC32C(hasher) => {
+ std::io::Seek::seek(cursor, SeekFrom::Start(0))?;
+ hasher.write(cursor.chunk());
+ Ok(())
+ }
}
}
@@ -76,6 +93,58 @@ impl Checksum {
match self {
Checksum::CRC32(hasher) => hasher.finalize(),
Checksum::Adler32(hasher) => hasher.finish(),
+ Checksum::CRC32C(hasher) => hasher.finalize(),
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::io::Cursor;
+
+ #[test]
+ fn test_crc32() {
+ let mut checksum = Checksum::try_new(0, None).unwrap();
+ let message = b"123456789";
+
+ let mut vector: Vec<u8> = message.to_vec();
+ let mut buff = Cursor::new(&mut vector);
+
+ checksum.update(&mut buff).unwrap();
+ let result = checksum.finalize();
+
+ let expected_crc = 0xcbf43926u32;
+ assert_eq!(result, expected_crc)
+ }
+
+ #[test]
+ fn test_adler32() {
+ let mut checksum = Checksum::try_new(1, None).unwrap();
+ let message = b"123456789";
+
+ let mut vector: Vec<u8> = message.to_vec();
+ let mut buff = Cursor::new(&mut vector);
+
+ checksum.update(&mut buff).unwrap();
+ let result = checksum.finalize();
+
+ let expected_crc = 0x091e01deu32;
+ assert_eq!(result, expected_crc)
+ }
+
+ #[test]
+ fn test_crc32c() {
+ let mut checksum = Checksum::try_new(2, None).unwrap();
+ let message = b"123456789";
+
+ let mut vector: Vec<u8> = message.to_vec();
+ let mut buff = Cursor::new(&mut vector);
+
+ checksum.update(&mut buff).unwrap();
+ let result = checksum.finalize();
+
+ let expected_crc = 0xe3069283u32;
+ assert_eq!(result, expected_crc)
+ }
+}
diff --git
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java
index 044c7842f..fd3ff65a0 100644
---
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java
+++
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java
@@ -68,7 +68,7 @@ public abstract class SpillWriter {
protected byte[][] dataTypes;
- // 0: CRC32, 1: Adler32. Spark uses Adler32 by default.
+ // 0: CRC32, 1: Adler32, or 2: CRC32C. Spark uses Adler32 by default.
protected int checksumAlgo = 1;
protected long checksum = -1;
@@ -98,6 +98,8 @@ public abstract class SpillWriter {
this.checksumAlgo = 0;
} else if (algo.equals("adler32")) {
this.checksumAlgo = 1;
+ } else if (algo.equals("crc32c")) {
+ this.checksumAlgo = 2;
} else {
throw new UnsupportedOperationException(
"Unsupported shuffle checksum algorithm: " + checksumAlgo);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]