This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4bd664e5e perf: optimize complex-type hash implementations (#3140)
4bd664e5e is described below
commit 4bd664e5ee2614417a333a1c2704dbdba064e001
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Jan 15 10:28:28 2026 -0500
perf: optimize complex-type hash implementations (#3140)
---
native/spark-expr/src/hash_funcs/utils.rs | 599 +++++++++++++++++++++++++-----
1 file changed, 497 insertions(+), 102 deletions(-)
diff --git a/native/spark-expr/src/hash_funcs/utils.rs
b/native/spark-expr/src/hash_funcs/utils.rs
index f72d42221..d634b27bf 100644
--- a/native/spark-expr/src/hash_funcs/utils.rs
+++ b/native/spark-expr/src/hash_funcs/utils.rs
@@ -31,13 +31,15 @@ macro_rules! hash_array {
)
});
if array.null_count() == 0 {
- for (i, hash) in $hashes.iter_mut().enumerate() {
- *hash = $hash_method(&array.value(i), *hash);
+ // Fast path: no nulls, use direct indexing
+ for i in 0..$hashes.len() {
+ $hashes[i] = $hash_method(&array.value(i), $hashes[i]);
}
} else {
- for (i, hash) in $hashes.iter_mut().enumerate() {
+ // Slow path: check nulls
+ for i in 0..$hashes.len() {
if !array.is_null(i) {
- *hash = $hash_method(&array.value(i), *hash);
+ $hashes[i] = $hash_method(&array.value(i), $hashes[i]);
}
}
}
@@ -58,14 +60,21 @@ macro_rules! hash_array_boolean {
)
});
if array.null_count() == 0 {
- for (i, hash) in $hashes.iter_mut().enumerate() {
- *hash =
$hash_method($hash_input_type::from(array.value(i)).to_le_bytes(), *hash);
+ // Fast path: no nulls, use direct indexing
+ for i in 0..$hashes.len() {
+ $hashes[i] = $hash_method(
+ $hash_input_type::from(array.value(i)).to_le_bytes(),
+ $hashes[i],
+ );
}
} else {
- for (i, hash) in $hashes.iter_mut().enumerate() {
+ // Slow path: check nulls
+ for i in 0..$hashes.len() {
if !array.is_null(i) {
- *hash =
-
$hash_method($hash_input_type::from(array.value(i)).to_le_bytes(), *hash);
+ $hashes[i] = $hash_method(
+ $hash_input_type::from(array.value(i)).to_le_bytes(),
+ $hashes[i],
+ );
}
}
}
@@ -88,13 +97,15 @@ macro_rules! hash_array_primitive {
let values = array.values();
if array.null_count() == 0 {
- for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
- *hash = $hash_method((*value as $ty).to_le_bytes(), *hash);
+ // Fast path: no nulls, use direct indexing
+ for i in 0..values.len() {
+ $hashes[i] = $hash_method((values[i] as $ty).to_le_bytes(),
$hashes[i]);
}
} else {
- for (i, (hash, value)) in
$hashes.iter_mut().zip(values.iter()).enumerate() {
+ // Slow path: check nulls
+ for i in 0..values.len() {
if !array.is_null(i) {
- *hash = $hash_method((*value as $ty).to_le_bytes(), *hash);
+ $hashes[i] = $hash_method((values[i] as
$ty).to_le_bytes(), $hashes[i]);
}
}
}
@@ -117,22 +128,26 @@ macro_rules! hash_array_primitive_float {
let values = array.values();
if array.null_count() == 0 {
- for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
+ // Fast path: no nulls, use direct indexing
+ for i in 0..values.len() {
+ let value = values[i];
// Spark uses 0 as hash for -0.0, see `Murmur3Hash` expression.
- if *value == 0.0 && value.is_sign_negative() {
- *hash = $hash_method((0 as $ty2).to_le_bytes(), *hash);
+ if value == 0.0 && value.is_sign_negative() {
+ $hashes[i] = $hash_method((0 as $ty2).to_le_bytes(),
$hashes[i]);
} else {
- *hash = $hash_method((*value as $ty).to_le_bytes(), *hash);
+ $hashes[i] = $hash_method((value as $ty).to_le_bytes(),
$hashes[i]);
}
}
} else {
- for (i, (hash, value)) in
$hashes.iter_mut().zip(values.iter()).enumerate() {
+ // Slow path: check nulls
+ for i in 0..values.len() {
if !array.is_null(i) {
+ let value = values[i];
// Spark uses 0 as hash for -0.0, see `Murmur3Hash`
expression.
- if *value == 0.0 && value.is_sign_negative() {
- *hash = $hash_method((0 as $ty2).to_le_bytes(), *hash);
+ if value == 0.0 && value.is_sign_negative() {
+ $hashes[i] = $hash_method((0 as $ty2).to_le_bytes(),
$hashes[i]);
} else {
- *hash = $hash_method((*value as $ty).to_le_bytes(),
*hash);
+ $hashes[i] = $hash_method((value as
$ty).to_le_bytes(), $hashes[i]);
}
}
}
@@ -155,22 +170,24 @@ macro_rules! hash_array_small_decimal {
});
if array.null_count() == 0 {
- for (i, hash) in $hashes.iter_mut().enumerate() {
- *hash = $hash_method(
+ // Fast path: no nulls, use direct indexing
+ for i in 0..$hashes.len() {
+ $hashes[i] = $hash_method(
i64::try_from(array.value(i))
.map(|v| v.to_le_bytes())
.map_err(|e|
DataFusionError::Execution(e.to_string()))?,
- *hash,
+ $hashes[i],
);
}
} else {
- for (i, hash) in $hashes.iter_mut().enumerate() {
+ // Slow path: check nulls
+ for i in 0..$hashes.len() {
if !array.is_null(i) {
- *hash = $hash_method(
+ $hashes[i] = $hash_method(
i64::try_from(array.value(i))
.map(|v| v.to_le_bytes())
.map_err(|e|
DataFusionError::Execution(e.to_string()))?,
- *hash,
+ $hashes[i],
);
}
}
@@ -193,13 +210,72 @@ macro_rules! hash_array_decimal {
});
if array.null_count() == 0 {
- for (i, hash) in $hashes.iter_mut().enumerate() {
- *hash = $hash_method(array.value(i).to_le_bytes(), *hash);
+ // Fast path: no nulls, use direct indexing
+ for i in 0..$hashes.len() {
+ $hashes[i] = $hash_method(array.value(i).to_le_bytes(),
$hashes[i]);
}
} else {
- for (i, hash) in $hashes.iter_mut().enumerate() {
+ // Slow path: check nulls
+ for i in 0..$hashes.len() {
if !array.is_null(i) {
- *hash = $hash_method(array.value(i).to_le_bytes(), *hash);
+ $hashes[i] = $hash_method(array.value(i).to_le_bytes(),
$hashes[i]);
+ }
+ }
+ }
+ };
+}
+
+/// Hash a list array with primitive elements by directly accessing the
underlying buffer.
+/// This avoids the overhead of slicing and recursive calls for common cases.
+/// Supports both variable-length lists (with offsets) and fixed-size lists.
+#[macro_export]
+macro_rules! hash_list_primitive {
+ // Variable-length list variant (List/LargeList)
+ (offsets: $offsets:expr, $list_array:ident, $elem_array:ident,
$hashes:ident, $hash_method:ident, $value_transform:expr) => {
+ if $list_array.null_count() == 0 && $elem_array.null_count() == 0 {
+ for (row_idx, hash) in $hashes.iter_mut().enumerate() {
+ let start = $offsets[row_idx] as usize;
+ let end = $offsets[row_idx + 1] as usize;
+ for elem_idx in start..end {
+ let value = $elem_array.value(elem_idx);
+ *hash = $hash_method($value_transform(value), *hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in $hashes.iter_mut().enumerate() {
+ if !$list_array.is_null(row_idx) {
+ let start = $offsets[row_idx] as usize;
+ let end = $offsets[row_idx + 1] as usize;
+ for elem_idx in start..end {
+ if !$elem_array.is_null(elem_idx) {
+ let value = $elem_array.value(elem_idx);
+ *hash = $hash_method($value_transform(value),
*hash);
+ }
+ }
+ }
+ }
+ }
+ };
+ // Fixed-size list variant
+ (fixed_size: $list_size:expr, $list_array:ident, $elem_array:ident,
$hashes:ident, $hash_method:ident, $value_transform:expr) => {
+ if $list_array.null_count() == 0 && $elem_array.null_count() == 0 {
+ for (row_idx, hash) in $hashes.iter_mut().enumerate() {
+ let start = row_idx * $list_size;
+ for elem_idx in 0..$list_size {
+ let value = $elem_array.value(start + elem_idx);
+ *hash = $hash_method($value_transform(value), *hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in $hashes.iter_mut().enumerate() {
+ if !$list_array.is_null(row_idx) {
+ let start = row_idx * $list_size;
+ for elem_idx in 0..$list_size {
+ if !$elem_array.is_null(start + elem_idx) {
+ let value = $elem_array.value(start + elem_idx);
+ *hash = $hash_method($value_transform(value),
*hash);
+ }
+ }
}
}
}
@@ -211,6 +287,220 @@ macro_rules! hash_array_decimal {
/// Spark hashes arrays by recursively hashing each element, where each
/// element's hash is computed using the previous element's hash as the seed.
/// This creates a chain: hash(elem_n, hash(elem_n-1, ... hash(elem_0,
seed)...))
+/// Dispatches hash operations for List/LargeList/FixedSizeList arrays with
primitive element types.
+/// This macro eliminates duplication by handling the type-to-array mapping
for all supported primitives.
+#[macro_export]
+macro_rules! hash_list_with_primitive_elements {
+ // Variant for List/LargeList with offsets
+ (offsets: $list_array_type:ident, $list_array:ident, $values:ident,
$offsets:ident, $field:expr, $hashes_buffer:ident, $hash_method:ident,
$recursive_hash_method:ident, $fallback_offset_type:ty, $col:ident) => {
+ match $field.data_type() {
+ DataType::Int8 => {
+ let elem_array =
$values.as_any().downcast_ref::<Int8Array>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method, |v: i8| (v as i32).to_le_bytes());
+ }
+ DataType::Int16 => {
+ let elem_array =
$values.as_any().downcast_ref::<Int16Array>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method, |v: i16| (v as i32).to_le_bytes());
+ }
+ DataType::Int32 => {
+ let elem_array =
$values.as_any().downcast_ref::<Int32Array>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method, |v: i32| v.to_le_bytes());
+ }
+ DataType::Int64 => {
+ let elem_array =
$values.as_any().downcast_ref::<Int64Array>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method, |v: i64| v.to_le_bytes());
+ }
+ DataType::Float32 => {
+ let elem_array =
$values.as_any().downcast_ref::<Float32Array>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method,
+ |v: f32| if v == 0.0 && v.is_sign_negative() {
(0_i32).to_le_bytes() } else { v.to_le_bytes() });
+ }
+ DataType::Float64 => {
+ let elem_array =
$values.as_any().downcast_ref::<Float64Array>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method,
+ |v: f64| if v == 0.0 && v.is_sign_negative() {
(0_i64).to_le_bytes() } else { v.to_le_bytes() });
+ }
+ DataType::Boolean => {
+ let elem_array =
$values.as_any().downcast_ref::<BooleanArray>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method, |v: bool|
(i32::from(v)).to_le_bytes());
+ }
+ DataType::Utf8 => {
+ let elem_array =
$values.as_any().downcast_ref::<StringArray>().unwrap();
+ if $list_array.null_count() == 0 && elem_array.null_count() ==
0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = $offsets[row_idx] as usize;
+ let end = $offsets[row_idx + 1] as usize;
+ for elem_idx in start..end {
+ *hash = $hash_method(elem_array.value(elem_idx),
*hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !$list_array.is_null(row_idx) {
+ let start = $offsets[row_idx] as usize;
+ let end = $offsets[row_idx + 1] as usize;
+ for elem_idx in start..end {
+ if !elem_array.is_null(elem_idx) {
+ *hash =
$hash_method(elem_array.value(elem_idx), *hash);
+ }
+ }
+ }
+ }
+ }
+ }
+ DataType::Binary => {
+ let elem_array =
$values.as_any().downcast_ref::<BinaryArray>().unwrap();
+ if $list_array.null_count() == 0 && elem_array.null_count() ==
0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = $offsets[row_idx] as usize;
+ let end = $offsets[row_idx + 1] as usize;
+ for elem_idx in start..end {
+ *hash = $hash_method(elem_array.value(elem_idx),
*hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !$list_array.is_null(row_idx) {
+ let start = $offsets[row_idx] as usize;
+ let end = $offsets[row_idx + 1] as usize;
+ for elem_idx in start..end {
+ if !elem_array.is_null(elem_idx) {
+ *hash =
$hash_method(elem_array.value(elem_idx), *hash);
+ }
+ }
+ }
+ }
+ }
+ }
+ DataType::Date32 => {
+ let elem_array =
$values.as_any().downcast_ref::<Date32Array>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method, |v: i32| v.to_le_bytes());
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ let elem_array =
$values.as_any().downcast_ref::<TimestampMicrosecondArray>().unwrap();
+ $crate::hash_list_primitive!(offsets: $offsets, $list_array,
elem_array, $hashes_buffer, $hash_method, |v: i64| v.to_le_bytes());
+ }
+ _ => {
+ // Fall back to recursive approach for complex element types
+ $crate::hash_list_array!($list_array_type,
$fallback_offset_type, $col, $hashes_buffer, $recursive_hash_method);
+ }
+ }
+ };
+ // Variant for FixedSizeList with fixed size
+ (fixed_size: $list_array:ident, $values:ident, $list_size:ident,
$field:expr, $hashes_buffer:ident, $hash_method:ident,
$recursive_hash_method:ident) => {
+ match $field.data_type() {
+ DataType::Int8 => {
+ let elem_array =
$values.as_any().downcast_ref::<Int8Array>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method, |v: i8| (v as
i32).to_le_bytes());
+ }
+ DataType::Int16 => {
+ let elem_array =
$values.as_any().downcast_ref::<Int16Array>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method, |v: i16| (v as
i32).to_le_bytes());
+ }
+ DataType::Int32 => {
+ let elem_array =
$values.as_any().downcast_ref::<Int32Array>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method, |v: i32|
v.to_le_bytes());
+ }
+ DataType::Int64 => {
+ let elem_array =
$values.as_any().downcast_ref::<Int64Array>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method, |v: i64|
v.to_le_bytes());
+ }
+ DataType::Float32 => {
+ let elem_array =
$values.as_any().downcast_ref::<Float32Array>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method,
+ |v: f32| if v == 0.0 && v.is_sign_negative() {
(0_i32).to_le_bytes() } else { v.to_le_bytes() });
+ }
+ DataType::Float64 => {
+ let elem_array =
$values.as_any().downcast_ref::<Float64Array>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method,
+ |v: f64| if v == 0.0 && v.is_sign_negative() {
(0_i64).to_le_bytes() } else { v.to_le_bytes() });
+ }
+ DataType::Boolean => {
+ let elem_array =
$values.as_any().downcast_ref::<BooleanArray>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method, |v: bool|
(i32::from(v)).to_le_bytes());
+ }
+ DataType::Utf8 => {
+ let elem_array =
$values.as_any().downcast_ref::<StringArray>().unwrap();
+ if $list_array.null_count() == 0 && elem_array.null_count() ==
0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = row_idx * $list_size;
+ for elem_idx in 0..$list_size {
+ *hash = $hash_method(elem_array.value(start +
elem_idx), *hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !$list_array.is_null(row_idx) {
+ let start = row_idx * $list_size;
+ for elem_idx in 0..$list_size {
+ if !elem_array.is_null(start + elem_idx) {
+ *hash =
$hash_method(elem_array.value(start + elem_idx), *hash);
+ }
+ }
+ }
+ }
+ }
+ }
+ DataType::Binary => {
+ let elem_array =
$values.as_any().downcast_ref::<BinaryArray>().unwrap();
+ if $list_array.null_count() == 0 && elem_array.null_count() ==
0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = row_idx * $list_size;
+ for elem_idx in 0..$list_size {
+ *hash = $hash_method(elem_array.value(start +
elem_idx), *hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !$list_array.is_null(row_idx) {
+ let start = row_idx * $list_size;
+ for elem_idx in 0..$list_size {
+ if !elem_array.is_null(start + elem_idx) {
+ *hash =
$hash_method(elem_array.value(start + elem_idx), *hash);
+ }
+ }
+ }
+ }
+ }
+ }
+ DataType::Date32 => {
+ let elem_array =
$values.as_any().downcast_ref::<Date32Array>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method, |v: i32|
v.to_le_bytes());
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ let elem_array =
$values.as_any().downcast_ref::<TimestampMicrosecondArray>().unwrap();
+ $crate::hash_list_primitive!(fixed_size: $list_size,
$list_array, elem_array, $hashes_buffer, $hash_method, |v: i64|
v.to_le_bytes());
+ }
+ _ => {
+ // Fall back to recursive approach for complex element types
+ if $list_array.null_count() == 0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = row_idx * $list_size;
+ for elem_idx in 0..$list_size {
+ let elem_array = $values.slice(start + elem_idx,
1);
+ let mut single_hash = [*hash];
+ $recursive_hash_method(&[elem_array], &mut
single_hash)?;
+ *hash = single_hash[0];
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !$list_array.is_null(row_idx) {
+ let start = row_idx * $list_size;
+ for elem_idx in 0..$list_size {
+ let elem_array = $values.slice(start +
elem_idx, 1);
+ let mut single_hash = [*hash];
+ $recursive_hash_method(&[elem_array], &mut
single_hash)?;
+ *hash = single_hash[0];
+ }
+ }
+ }
+ }
+ }
+ }
+ };
+}
+
#[macro_export]
macro_rules! hash_list_array {
($array_type:ident, $offset_type:ty, $column: ident, $hashes: ident,
$recursive_hash_method: ident) => {
@@ -482,44 +772,26 @@ macro_rules! create_hashes_internal {
)))
}
},
- DataType::List(_) => {
- $crate::hash_list_array!(ListArray, i32, col,
$hashes_buffer, $recursive_hash_method);
+ DataType::List(field) => {
+ let list_array =
col.as_any().downcast_ref::<ListArray>().unwrap();
+ let values = list_array.values();
+ let offsets = list_array.offsets();
+
+ $crate::hash_list_with_primitive_elements!(offsets:
ListArray, list_array, values, offsets, field, $hashes_buffer, $hash_method,
$recursive_hash_method, i32, col);
}
- DataType::LargeList(_) => {
- $crate::hash_list_array!(LargeListArray, i64, col,
$hashes_buffer, $recursive_hash_method);
+ DataType::LargeList(field) => {
+ let list_array =
col.as_any().downcast_ref::<LargeListArray>().unwrap();
+ let values = list_array.values();
+ let offsets = list_array.offsets();
+
+ $crate::hash_list_with_primitive_elements!(offsets:
LargeListArray, list_array, values, offsets, field, $hashes_buffer,
$hash_method, $recursive_hash_method, i64, col);
}
- DataType::FixedSizeList(_, size) => {
+ DataType::FixedSizeList(field, size) => {
let list_array =
col.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
let values = list_array.values();
let list_size = *size as usize;
- if list_array.null_count() == 0 {
- // Fast path: no nulls, skip null checks
- for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
- let start = row_idx * list_size;
- // Hash each element in sequence, chaining the
hash values
- for elem_idx in 0..list_size {
- let elem_array = values.slice(start +
elem_idx, 1);
- let mut single_hash = [*hash];
- $recursive_hash_method(&[elem_array], &mut
single_hash)?;
- *hash = single_hash[0];
- }
- }
- } else {
- // Slow path: array has nulls, check each row
- for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
- if !list_array.is_null(row_idx) {
- let start = row_idx * list_size;
- // Hash each element in sequence, chaining the
hash values
- for elem_idx in 0..list_size {
- let elem_array = values.slice(start +
elem_idx, 1);
- let mut single_hash = [*hash];
- $recursive_hash_method(&[elem_array], &mut
single_hash)?;
- *hash = single_hash[0];
- }
- }
- }
- }
+ $crate::hash_list_with_primitive_elements!(fixed_size:
list_array, values, list_size, field, $hashes_buffer, $hash_method,
$recursive_hash_method);
}
DataType::Struct(_) => {
let struct_array =
col.as_any().downcast_ref::<StructArray>().unwrap();
@@ -529,56 +801,179 @@ macro_rules! create_hashes_internal {
$recursive_hash_method(&columns, $hashes_buffer)?;
}
}
- DataType::Map(_, _) => {
+ DataType::Map(field, _) => {
let map_array =
col.as_any().downcast_ref::<MapArray>().unwrap();
- // For maps, Spark hashes by iterating through (key,
value) pairs
- // For each entry, hash the key then the value
let keys = map_array.keys();
let values = map_array.values();
let offsets = map_array.offsets();
- if map_array.null_count() == 0 {
- // Fast path: no nulls, skip null checks
- for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
- let start = offsets[row_idx] as usize;
- let end = offsets[row_idx + 1] as usize;
- // Hash each key-value pair in sequence
- for entry_idx in start..end {
- // Hash the key
- let key_array = keys.slice(entry_idx, 1);
- let mut single_hash = [*hash];
- $recursive_hash_method(&[key_array], &mut
single_hash)?;
- *hash = single_hash[0];
+ // Get key and value types from the struct field
+ if let DataType::Struct(fields) = field.data_type() {
+ let key_type = &fields[0].data_type();
+ let value_type = &fields[1].data_type();
- // Hash the value
- let value_array = values.slice(entry_idx, 1);
- single_hash = [*hash];
- $recursive_hash_method(&[value_array], &mut
single_hash)?;
- *hash = single_hash[0];
+ // Specialize for common map key/value combinations
+ match (key_type, value_type) {
+ (DataType::Utf8, DataType::Int32) => {
+ let key_array =
keys.as_any().downcast_ref::<StringArray>().unwrap();
+ let value_array =
values.as_any().downcast_ref::<Int32Array>().unwrap();
+ if map_array.null_count() == 0 &&
key_array.null_count() == 0 && value_array.null_count() == 0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = offsets[row_idx] as usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ *hash =
$hash_method(key_array.value(entry_idx), *hash);
+ *hash =
$hash_method(value_array.value(entry_idx).to_le_bytes(), *hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !map_array.is_null(row_idx) {
+ let start = offsets[row_idx] as
usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ if
!key_array.is_null(entry_idx) {
+ *hash =
$hash_method(key_array.value(entry_idx), *hash);
+ }
+ if
!value_array.is_null(entry_idx) {
+ *hash =
$hash_method(value_array.value(entry_idx).to_le_bytes(), *hash);
+ }
+ }
+ }
+ }
+ }
}
- }
- } else {
- // Slow path: array has nulls, check each row
- for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
- if !map_array.is_null(row_idx) {
- let start = offsets[row_idx] as usize;
- let end = offsets[row_idx + 1] as usize;
- // Hash each key-value pair in sequence
- for entry_idx in start..end {
- // Hash the key
- let key_array = keys.slice(entry_idx, 1);
- let mut single_hash = [*hash];
- $recursive_hash_method(&[key_array], &mut
single_hash)?;
- *hash = single_hash[0];
+ (DataType::Int32, DataType::Utf8) => {
+ let key_array =
keys.as_any().downcast_ref::<Int32Array>().unwrap();
+ let value_array =
values.as_any().downcast_ref::<StringArray>().unwrap();
+ if map_array.null_count() == 0 &&
key_array.null_count() == 0 && value_array.null_count() == 0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = offsets[row_idx] as usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ *hash =
$hash_method(key_array.value(entry_idx).to_le_bytes(), *hash);
+ *hash =
$hash_method(value_array.value(entry_idx), *hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !map_array.is_null(row_idx) {
+ let start = offsets[row_idx] as
usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ if
!key_array.is_null(entry_idx) {
+ *hash =
$hash_method(key_array.value(entry_idx).to_le_bytes(), *hash);
+ }
+ if
!value_array.is_null(entry_idx) {
+ *hash =
$hash_method(value_array.value(entry_idx), *hash);
+ }
+ }
+ }
+ }
+ }
+ }
+ (DataType::Utf8, DataType::Utf8) => {
+ let key_array =
keys.as_any().downcast_ref::<StringArray>().unwrap();
+ let value_array =
values.as_any().downcast_ref::<StringArray>().unwrap();
+ if map_array.null_count() == 0 &&
key_array.null_count() == 0 && value_array.null_count() == 0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = offsets[row_idx] as usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ *hash =
$hash_method(key_array.value(entry_idx), *hash);
+ *hash =
$hash_method(value_array.value(entry_idx), *hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !map_array.is_null(row_idx) {
+ let start = offsets[row_idx] as
usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ if
!key_array.is_null(entry_idx) {
+ *hash =
$hash_method(key_array.value(entry_idx), *hash);
+ }
+ if
!value_array.is_null(entry_idx) {
+ *hash =
$hash_method(value_array.value(entry_idx), *hash);
+ }
+ }
+ }
+ }
+ }
+ }
+ (DataType::Int32, DataType::Int32) => {
+ let key_array =
keys.as_any().downcast_ref::<Int32Array>().unwrap();
+ let value_array =
values.as_any().downcast_ref::<Int32Array>().unwrap();
+ if map_array.null_count() == 0 &&
key_array.null_count() == 0 && value_array.null_count() == 0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = offsets[row_idx] as usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ *hash =
$hash_method(key_array.value(entry_idx).to_le_bytes(), *hash);
+ *hash =
$hash_method(value_array.value(entry_idx).to_le_bytes(), *hash);
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !map_array.is_null(row_idx) {
+ let start = offsets[row_idx] as
usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ if
!key_array.is_null(entry_idx) {
+ *hash =
$hash_method(key_array.value(entry_idx).to_le_bytes(), *hash);
+ }
+ if
!value_array.is_null(entry_idx) {
+ *hash =
$hash_method(value_array.value(entry_idx).to_le_bytes(), *hash);
+ }
+ }
+ }
+ }
+ }
+ }
+ _ => {
+ // Fall back to recursive approach for other
type combinations
+ if map_array.null_count() == 0 {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ let start = offsets[row_idx] as usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ let key_array =
keys.slice(entry_idx, 1);
+ let mut single_hash = [*hash];
+
$recursive_hash_method(&[key_array], &mut single_hash)?;
+ *hash = single_hash[0];
+
+ let value_array =
values.slice(entry_idx, 1);
+ single_hash = [*hash];
+
$recursive_hash_method(&[value_array], &mut single_hash)?;
+ *hash = single_hash[0];
+ }
+ }
+ } else {
+ for (row_idx, hash) in
$hashes_buffer.iter_mut().enumerate() {
+ if !map_array.is_null(row_idx) {
+ let start = offsets[row_idx] as
usize;
+ let end = offsets[row_idx + 1] as
usize;
+ for entry_idx in start..end {
+ let key_array =
keys.slice(entry_idx, 1);
+ let mut single_hash = [*hash];
+
$recursive_hash_method(&[key_array], &mut single_hash)?;
+ *hash = single_hash[0];
- // Hash the value
- let value_array = values.slice(entry_idx,
1);
- single_hash = [*hash];
- $recursive_hash_method(&[value_array],
&mut single_hash)?;
- *hash = single_hash[0];
+ let value_array =
values.slice(entry_idx, 1);
+ single_hash = [*hash];
+
$recursive_hash_method(&[value_array], &mut single_hash)?;
+ *hash = single_hash[0];
+ }
+ }
+ }
}
}
}
+ } else {
+ return Err(DataFusionError::Internal(format!(
+ "Map field type must be a struct, got: {}",
+ field.data_type()
+ )));
}
}
_ => {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]