This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1173479  Move `hash_array` into hash_utils.rs (#807)
1173479 is described below

commit 11734799b4c5e0a8627deba18ec40a0f47fea421
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Aug 1 08:49:13 2021 -0400

    Move `hash_array` into hash_utils.rs (#807)
---
 .../core/src/execution_plans/shuffle_writer.rs     |   2 +-
 datafusion/src/physical_plan/hash_join.rs          | 365 +-------------------
 datafusion/src/physical_plan/hash_utils.rs         | 368 ++++++++++++++++++++-
 datafusion/src/physical_plan/repartition.rs        |   3 +-
 4 files changed, 374 insertions(+), 364 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs 
b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 8081dab..b1db21f 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -44,7 +44,7 @@ use datafusion::arrow::ipc::reader::FileReader;
 use datafusion::arrow::ipc::writer::FileWriter;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
-use datafusion::physical_plan::hash_join::create_hashes;
+use datafusion::physical_plan::hash_utils::create_hashes;
 use datafusion::physical_plan::repartition::RepartitionExec;
 use datafusion::physical_plan::Partitioning::RoundRobinBatch;
 use datafusion::physical_plan::{
diff --git a/datafusion/src/physical_plan/hash_join.rs 
b/datafusion/src/physical_plan/hash_join.rs
index 00ca153..1a174bb 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -18,18 +18,15 @@
 //! Defines the join plan for executing partitions in parallel and then 
joining the results
 //! into a set of partitions.
 
-use ahash::CallHasher;
 use ahash::RandomState;
 
 use arrow::{
     array::{
-        ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, 
Float32Array,
-        Float64Array, LargeStringArray, PrimitiveArray, 
TimestampMicrosecondArray,
-        TimestampMillisecondArray, TimestampNanosecondArray, 
UInt32BufferBuilder,
-        UInt32Builder, UInt64BufferBuilder, UInt64Builder,
+        ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray,
+        UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder,
     },
     compute,
-    datatypes::{TimeUnit, UInt32Type, UInt64Type},
+    datatypes::{UInt32Type, UInt64Type},
 };
 use smallvec::{smallvec, SmallVec};
 use std::{any::Any, usize};
@@ -53,6 +50,7 @@ use arrow::array::{
 };
 
 use super::expressions::Column;
+use super::hash_utils::create_hashes;
 use super::{
     coalesce_partitions::CoalescePartitionsExec,
     hash_utils::{build_join_schema, check_join_is_valid, JoinOn},
@@ -790,13 +788,6 @@ impl BuildHasher for IdHashBuilder {
     }
 }
 
-// Combines two hashes into one hash
-#[inline]
-fn combine_hashes(l: u64, r: u64) -> u64 {
-    let hash = (17 * 37u64).wrapping_add(l);
-    hash.wrapping_mul(37).wrapping_add(r)
-}
-
 macro_rules! equal_rows_elem {
     ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => 
{{
         let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap();
@@ -848,338 +839,6 @@ fn equal_rows(
     err.unwrap_or(Ok(res))
 }
 
-macro_rules! hash_array {
-    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, 
$random_state: ident, $multi_col: ident) => {
-        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
-        if array.null_count() == 0 {
-            if $multi_col {
-                for (i, hash) in $hashes.iter_mut().enumerate() {
-                    *hash = combine_hashes(
-                        $ty::get_hash(&array.value(i), $random_state),
-                        *hash,
-                    );
-                }
-            } else {
-                for (i, hash) in $hashes.iter_mut().enumerate() {
-                    *hash = $ty::get_hash(&array.value(i), $random_state);
-                }
-            }
-        } else {
-            if $multi_col {
-                for (i, hash) in $hashes.iter_mut().enumerate() {
-                    if !array.is_null(i) {
-                        *hash = combine_hashes(
-                            $ty::get_hash(&array.value(i), $random_state),
-                            *hash,
-                        );
-                    }
-                }
-            } else {
-                for (i, hash) in $hashes.iter_mut().enumerate() {
-                    if !array.is_null(i) {
-                        *hash = $ty::get_hash(&array.value(i), $random_state);
-                    }
-                }
-            }
-        }
-    };
-}
-
-macro_rules! hash_array_primitive {
-    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, 
$random_state: ident, $multi_col: ident) => {
-        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
-        let values = array.values();
-
-        if array.null_count() == 0 {
-            if $multi_col {
-                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
-                    *hash = combine_hashes($ty::get_hash(value, 
$random_state), *hash);
-                }
-            } else {
-                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
-                    *hash = $ty::get_hash(value, $random_state)
-                }
-            }
-        } else {
-            if $multi_col {
-                for (i, (hash, value)) in
-                    $hashes.iter_mut().zip(values.iter()).enumerate()
-                {
-                    if !array.is_null(i) {
-                        *hash =
-                            combine_hashes($ty::get_hash(value, 
$random_state), *hash);
-                    }
-                }
-            } else {
-                for (i, (hash, value)) in
-                    $hashes.iter_mut().zip(values.iter()).enumerate()
-                {
-                    if !array.is_null(i) {
-                        *hash = $ty::get_hash(value, $random_state);
-                    }
-                }
-            }
-        }
-    };
-}
-
-macro_rules! hash_array_float {
-    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, 
$random_state: ident, $multi_col: ident) => {
-        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
-        let values = array.values();
-
-        if array.null_count() == 0 {
-            if $multi_col {
-                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
-                    *hash = combine_hashes(
-                        $ty::get_hash(
-                            &$ty::from_le_bytes(value.to_le_bytes()),
-                            $random_state,
-                        ),
-                        *hash,
-                    );
-                }
-            } else {
-                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
-                    *hash = $ty::get_hash(
-                        &$ty::from_le_bytes(value.to_le_bytes()),
-                        $random_state,
-                    )
-                }
-            }
-        } else {
-            if $multi_col {
-                for (i, (hash, value)) in
-                    $hashes.iter_mut().zip(values.iter()).enumerate()
-                {
-                    if !array.is_null(i) {
-                        *hash = combine_hashes(
-                            $ty::get_hash(
-                                &$ty::from_le_bytes(value.to_le_bytes()),
-                                $random_state,
-                            ),
-                            *hash,
-                        );
-                    }
-                }
-            } else {
-                for (i, (hash, value)) in
-                    $hashes.iter_mut().zip(values.iter()).enumerate()
-                {
-                    if !array.is_null(i) {
-                        *hash = $ty::get_hash(
-                            &$ty::from_le_bytes(value.to_le_bytes()),
-                            $random_state,
-                        );
-                    }
-                }
-            }
-        }
-    };
-}
-
-/// Creates hash values for every element in the row based on the values in 
the columns
-pub fn create_hashes<'a>(
-    arrays: &[ArrayRef],
-    random_state: &RandomState,
-    hashes_buffer: &'a mut Vec<u64>,
-) -> Result<&'a mut Vec<u64>> {
-    // combine hashes with `combine_hashes` if we have more than 1 column
-    let multi_col = arrays.len() > 1;
-
-    for col in arrays {
-        match col.data_type() {
-            DataType::UInt8 => {
-                hash_array_primitive!(
-                    UInt8Array,
-                    col,
-                    u8,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::UInt16 => {
-                hash_array_primitive!(
-                    UInt16Array,
-                    col,
-                    u16,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::UInt32 => {
-                hash_array_primitive!(
-                    UInt32Array,
-                    col,
-                    u32,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::UInt64 => {
-                hash_array_primitive!(
-                    UInt64Array,
-                    col,
-                    u64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Int8 => {
-                hash_array_primitive!(
-                    Int8Array,
-                    col,
-                    i8,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Int16 => {
-                hash_array_primitive!(
-                    Int16Array,
-                    col,
-                    i16,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Int32 => {
-                hash_array_primitive!(
-                    Int32Array,
-                    col,
-                    i32,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Int64 => {
-                hash_array_primitive!(
-                    Int64Array,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Float32 => {
-                hash_array_float!(
-                    Float32Array,
-                    col,
-                    u32,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Float64 => {
-                hash_array_float!(
-                    Float64Array,
-                    col,
-                    u64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Timestamp(TimeUnit::Millisecond, None) => {
-                hash_array_primitive!(
-                    TimestampMillisecondArray,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Timestamp(TimeUnit::Microsecond, None) => {
-                hash_array_primitive!(
-                    TimestampMicrosecondArray,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
-                hash_array_primitive!(
-                    TimestampNanosecondArray,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Date32 => {
-                hash_array_primitive!(
-                    Date32Array,
-                    col,
-                    i32,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Date64 => {
-                hash_array_primitive!(
-                    Date64Array,
-                    col,
-                    i64,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Boolean => {
-                hash_array!(
-                    BooleanArray,
-                    col,
-                    u8,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::Utf8 => {
-                hash_array!(
-                    StringArray,
-                    col,
-                    str,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            DataType::LargeUtf8 => {
-                hash_array!(
-                    LargeStringArray,
-                    col,
-                    str,
-                    hashes_buffer,
-                    random_state,
-                    multi_col
-                );
-            }
-            _ => {
-                // This is internal because we should have caught this before.
-                return Err(DataFusionError::Internal(
-                    "Unsupported data type in hasher".to_string(),
-                ));
-            }
-        }
-    }
-    Ok(hashes_buffer)
-}
-
 // Produces a batch for left-side rows that have/have not been matched during 
the whole join
 fn produce_from_matched(
     visited_left_side: &[bool],
@@ -2116,22 +1775,6 @@ mod tests {
     }
 
     #[test]
-    fn create_hashes_for_float_arrays() -> Result<()> {
-        let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 
444.7]));
-        let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 
444.7]));
-
-        let random_state = RandomState::with_seeds(0, 0, 0, 0);
-        let hashes_buff = &mut vec![0; f32_arr.len()];
-        let hashes = create_hashes(&[f32_arr], &random_state, hashes_buff)?;
-        assert_eq!(hashes.len(), 4,);
-
-        let hashes = create_hashes(&[f64_arr], &random_state, hashes_buff)?;
-        assert_eq!(hashes.len(), 4,);
-
-        Ok(())
-    }
-
-    #[test]
     fn join_with_hash_collision() -> Result<()> {
         let mut hashmap_left = HashMap::with_capacity_and_hasher(2, 
IdHashBuilder {});
         let left = build_table_i32(
diff --git a/datafusion/src/physical_plan/hash_utils.rs 
b/datafusion/src/physical_plan/hash_utils.rs
index 9243aff..e937b4e 100644
--- a/datafusion/src/physical_plan/hash_utils.rs
+++ b/datafusion/src/physical_plan/hash_utils.rs
@@ -18,7 +18,14 @@
 //! Functionality used both on logical and physical plans
 
 use crate::error::{DataFusionError, Result};
-use arrow::datatypes::{Field, Schema};
+use ahash::{CallHasher, RandomState};
+use arrow::array::{
+    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, 
Float64Array,
+    Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, 
StringArray,
+    TimestampMicrosecondArray, TimestampMillisecondArray, 
TimestampNanosecondArray,
+    UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
 use std::collections::HashSet;
 
 use crate::logical_plan::JoinType;
@@ -101,8 +108,351 @@ pub fn build_join_schema(left: &Schema, right: &Schema, 
join_type: &JoinType) ->
     Schema::new(fields)
 }
 
+// Combines two hashes into one hash
+#[inline]
+fn combine_hashes(l: u64, r: u64) -> u64 {
+    let hash = (17 * 37u64).wrapping_add(l);
+    hash.wrapping_mul(37).wrapping_add(r)
+}
+
+macro_rules! hash_array {
+    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, 
$random_state: ident, $multi_col: ident) => {
+        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+        if array.null_count() == 0 {
+            if $multi_col {
+                for (i, hash) in $hashes.iter_mut().enumerate() {
+                    *hash = combine_hashes(
+                        $ty::get_hash(&array.value(i), $random_state),
+                        *hash,
+                    );
+                }
+            } else {
+                for (i, hash) in $hashes.iter_mut().enumerate() {
+                    *hash = $ty::get_hash(&array.value(i), $random_state);
+                }
+            }
+        } else {
+            if $multi_col {
+                for (i, hash) in $hashes.iter_mut().enumerate() {
+                    if !array.is_null(i) {
+                        *hash = combine_hashes(
+                            $ty::get_hash(&array.value(i), $random_state),
+                            *hash,
+                        );
+                    }
+                }
+            } else {
+                for (i, hash) in $hashes.iter_mut().enumerate() {
+                    if !array.is_null(i) {
+                        *hash = $ty::get_hash(&array.value(i), $random_state);
+                    }
+                }
+            }
+        }
+    };
+}
+
+macro_rules! hash_array_primitive {
+    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, 
$random_state: ident, $multi_col: ident) => {
+        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+        let values = array.values();
+
+        if array.null_count() == 0 {
+            if $multi_col {
+                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
+                    *hash = combine_hashes($ty::get_hash(value, 
$random_state), *hash);
+                }
+            } else {
+                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
+                    *hash = $ty::get_hash(value, $random_state)
+                }
+            }
+        } else {
+            if $multi_col {
+                for (i, (hash, value)) in
+                    $hashes.iter_mut().zip(values.iter()).enumerate()
+                {
+                    if !array.is_null(i) {
+                        *hash =
+                            combine_hashes($ty::get_hash(value, 
$random_state), *hash);
+                    }
+                }
+            } else {
+                for (i, (hash, value)) in
+                    $hashes.iter_mut().zip(values.iter()).enumerate()
+                {
+                    if !array.is_null(i) {
+                        *hash = $ty::get_hash(value, $random_state);
+                    }
+                }
+            }
+        }
+    };
+}
+
+macro_rules! hash_array_float {
+    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, 
$random_state: ident, $multi_col: ident) => {
+        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+        let values = array.values();
+
+        if array.null_count() == 0 {
+            if $multi_col {
+                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
+                    *hash = combine_hashes(
+                        $ty::get_hash(
+                            &$ty::from_le_bytes(value.to_le_bytes()),
+                            $random_state,
+                        ),
+                        *hash,
+                    );
+                }
+            } else {
+                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
+                    *hash = $ty::get_hash(
+                        &$ty::from_le_bytes(value.to_le_bytes()),
+                        $random_state,
+                    )
+                }
+            }
+        } else {
+            if $multi_col {
+                for (i, (hash, value)) in
+                    $hashes.iter_mut().zip(values.iter()).enumerate()
+                {
+                    if !array.is_null(i) {
+                        *hash = combine_hashes(
+                            $ty::get_hash(
+                                &$ty::from_le_bytes(value.to_le_bytes()),
+                                $random_state,
+                            ),
+                            *hash,
+                        );
+                    }
+                }
+            } else {
+                for (i, (hash, value)) in
+                    $hashes.iter_mut().zip(values.iter()).enumerate()
+                {
+                    if !array.is_null(i) {
+                        *hash = $ty::get_hash(
+                            &$ty::from_le_bytes(value.to_le_bytes()),
+                            $random_state,
+                        );
+                    }
+                }
+            }
+        }
+    };
+}
+
+/// Creates hash values for every row, based on the values in the columns
+///
+/// This implements so-called "vectorized hashing"
+pub fn create_hashes<'a>(
+    arrays: &[ArrayRef],
+    random_state: &RandomState,
+    hashes_buffer: &'a mut Vec<u64>,
+) -> Result<&'a mut Vec<u64>> {
+    // combine hashes with `combine_hashes` if we have more than 1 column
+    let multi_col = arrays.len() > 1;
+
+    for col in arrays {
+        match col.data_type() {
+            DataType::UInt8 => {
+                hash_array_primitive!(
+                    UInt8Array,
+                    col,
+                    u8,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::UInt16 => {
+                hash_array_primitive!(
+                    UInt16Array,
+                    col,
+                    u16,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::UInt32 => {
+                hash_array_primitive!(
+                    UInt32Array,
+                    col,
+                    u32,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::UInt64 => {
+                hash_array_primitive!(
+                    UInt64Array,
+                    col,
+                    u64,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Int8 => {
+                hash_array_primitive!(
+                    Int8Array,
+                    col,
+                    i8,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Int16 => {
+                hash_array_primitive!(
+                    Int16Array,
+                    col,
+                    i16,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Int32 => {
+                hash_array_primitive!(
+                    Int32Array,
+                    col,
+                    i32,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Int64 => {
+                hash_array_primitive!(
+                    Int64Array,
+                    col,
+                    i64,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Float32 => {
+                hash_array_float!(
+                    Float32Array,
+                    col,
+                    u32,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Float64 => {
+                hash_array_float!(
+                    Float64Array,
+                    col,
+                    u64,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Timestamp(TimeUnit::Millisecond, None) => {
+                hash_array_primitive!(
+                    TimestampMillisecondArray,
+                    col,
+                    i64,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Timestamp(TimeUnit::Microsecond, None) => {
+                hash_array_primitive!(
+                    TimestampMicrosecondArray,
+                    col,
+                    i64,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+                hash_array_primitive!(
+                    TimestampNanosecondArray,
+                    col,
+                    i64,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Date32 => {
+                hash_array_primitive!(
+                    Date32Array,
+                    col,
+                    i32,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Date64 => {
+                hash_array_primitive!(
+                    Date64Array,
+                    col,
+                    i64,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Boolean => {
+                hash_array!(
+                    BooleanArray,
+                    col,
+                    u8,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::Utf8 => {
+                hash_array!(
+                    StringArray,
+                    col,
+                    str,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            DataType::LargeUtf8 => {
+                hash_array!(
+                    LargeStringArray,
+                    col,
+                    str,
+                    hashes_buffer,
+                    random_state,
+                    multi_col
+                );
+            }
+            _ => {
+                // This is internal because we should have caught this before.
+                return Err(DataFusionError::Internal(
+                    "Unsupported data type in hasher".to_string(),
+                ));
+            }
+        }
+    }
+    Ok(hashes_buffer)
+}
+
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use super::*;
 
     fn check(left: &[Column], right: &[Column], on: &[(Column, Column)]) -> 
Result<()> {
@@ -163,4 +513,20 @@ mod tests {
 
         assert!(check(&left, &right, on).is_ok());
     }
+
+    #[test]
+    fn create_hashes_for_float_arrays() -> Result<()> {
+        let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 
444.7]));
+        let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 
444.7]));
+
+        let random_state = RandomState::with_seeds(0, 0, 0, 0);
+        let hashes_buff = &mut vec![0; f32_arr.len()];
+        let hashes = create_hashes(&[f32_arr], &random_state, hashes_buff)?;
+        assert_eq!(hashes.len(), 4,);
+
+        let hashes = create_hashes(&[f64_arr], &random_state, hashes_buff)?;
+        assert_eq!(hashes.len(), 4,);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/src/physical_plan/repartition.rs 
b/datafusion/src/physical_plan/repartition.rs
index e67e4c2..b59071a 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -25,13 +25,14 @@ use std::time::Instant;
 use std::{any::Any, vec};
 
 use crate::error::{DataFusionError, Result};
+use crate::physical_plan::hash_utils::create_hashes;
 use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, 
SQLMetric};
 use arrow::record_batch::RecordBatch;
 use arrow::{array::Array, error::Result as ArrowResult};
 use arrow::{compute::take, datatypes::SchemaRef};
 use tokio_stream::wrappers::UnboundedReceiverStream;
 
-use super::{hash_join::create_hashes, RecordBatchStream, 
SendableRecordBatchStream};
+use super::{RecordBatchStream, SendableRecordBatchStream};
 use async_trait::async_trait;
 
 use futures::stream::Stream;

Reply via email to