This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new bce0b418b6 Add distinct kernels (#960) (#4438) (#4716)
bce0b418b6 is described below

commit bce0b418b69dcf0dab1fca3edbe5db1f5ca122a2
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Aug 21 15:25:26 2023 +0100

    Add distinct kernels (#960) (#4438) (#4716)
    
    * Add distinct kernels (#960) (#4438)
    
    * Fixes
    
    * Add tests
    
    * Handle NullArray
    
    * Fix comparisons between scalar and empty array
    
    * Clippy
    
    * Review feedback
---
 arrow-array/src/array/boolean_array.rs |   9 +
 arrow-ord/src/cmp.rs                   | 304 ++++++++++++++++++++++++++++-----
 arrow-ord/src/partition.rs             |  20 +--
 3 files changed, 274 insertions(+), 59 deletions(-)

diff --git a/arrow-array/src/array/boolean_array.rs 
b/arrow-array/src/array/boolean_array.rs
index 0d9a1044be..4d19babe3e 100644
--- a/arrow-array/src/array/boolean_array.rs
+++ b/arrow-array/src/array/boolean_array.rs
@@ -437,6 +437,15 @@ impl<Ptr: std::borrow::Borrow<Option<bool>>> 
FromIterator<Ptr> for BooleanArray
     }
 }
 
+impl From<BooleanBuffer> for BooleanArray {
+    fn from(values: BooleanBuffer) -> Self {
+        Self {
+            values,
+            nulls: None,
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/arrow-ord/src/cmp.rs b/arrow-ord/src/cmp.rs
index aad61fa8f0..96f5aafd86 100644
--- a/arrow-ord/src/cmp.rs
+++ b/arrow-ord/src/cmp.rs
@@ -33,6 +33,7 @@ use arrow_buffer::bit_util::ceil;
 use arrow_buffer::{BooleanBuffer, MutableBuffer, NullBuffer};
 use arrow_schema::ArrowError;
 use arrow_select::take::take;
+use std::ops::Not;
 
 #[derive(Debug, Copy, Clone)]
 enum Op {
@@ -42,6 +43,8 @@ enum Op {
     LessEqual,
     Greater,
     GreaterEqual,
+    Distinct,
+    NotDistinct,
 }
 
 impl std::fmt::Display for Op {
@@ -53,6 +56,8 @@ impl std::fmt::Display for Op {
             Op::LessEqual => write!(f, "<="),
             Op::Greater => write!(f, ">"),
             Op::GreaterEqual => write!(f, ">="),
+            Op::Distinct => write!(f, "IS DISTINCT FROM"),
+            Op::NotDistinct => write!(f, "IS NOT DISTINCT FROM"),
         }
     }
 }
@@ -129,7 +134,43 @@ pub fn gt_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> 
Result<BooleanArray, ArrowErro
     compare_op(Op::GreaterEqual, lhs, rhs)
 }
 
+/// Perform `left IS DISTINCT FROM right` operation on two [`Datum`]
+///
+/// [`distinct`] is similar to [`neq`], only differing in null handling. In 
particular, two
+/// operands are considered DISTINCT if they have a different value or if one 
of them is NULL
+/// and the other isn't. The result of [`distinct`] is never NULL.
+///
+/// For floating values like f32 and f64, this comparison produces an ordering 
in accordance to
+/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) 
floating point standard.
+/// Note that totalOrder treats positive and negative zeros as different. If 
it is necessary
+/// to treat them as equal, please normalize zeros before calling this kernel.
+///
+/// Please refer to [`f32::total_cmp`] and [`f64::total_cmp`]
+pub fn distinct(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, 
ArrowError> {
+    compare_op(Op::Distinct, lhs, rhs)
+}
+
+/// Perform `left IS NOT DISTINCT FROM right` operation on two [`Datum`]
+///
+/// [`not_distinct`] is similar to [`eq`], only differing in null handling. In 
particular, two
+/// operands are considered `NOT DISTINCT` if they have the same value or if 
both of them
+/// is NULL. The result of [`not_distinct`] is never NULL.
+///
+/// For floating values like f32 and f64, this comparison produces an ordering 
in accordance to
+/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) 
floating point standard.
+/// Note that totalOrder treats positive and negative zeros as different. If 
it is necessary
+/// to treat them as equal, please normalize zeros before calling this kernel.
+///
+/// Please refer to [`f32::total_cmp`] and [`f64::total_cmp`]
+pub fn not_distinct(
+    lhs: &dyn Datum,
+    rhs: &dyn Datum,
+) -> Result<BooleanArray, ArrowError> {
+    compare_op(Op::NotDistinct, lhs, rhs)
+}
+
 /// Perform `op` on the provided `Datum`
+#[inline(never)]
 fn compare_op(
     op: Op,
     lhs: &dyn Datum,
@@ -141,51 +182,114 @@ fn compare_op(
 
     let l_len = l.len();
     let r_len = r.len();
-    let l_nulls = l.logical_nulls();
-    let r_nulls = r.logical_nulls();
 
-    let (len, nulls) = match (l_s, r_s) {
-        (true, true) | (false, false) => {
-            if l_len != r_len {
-                return Err(ArrowError::InvalidArgumentError(format!(
-                    "Cannot compare arrays of different lengths, got {l_len} 
vs {r_len}"
-                )));
-            }
-            (l_len, NullBuffer::union(l_nulls.as_ref(), r_nulls.as_ref()))
-        }
-        (true, false) => match l_nulls.map(|x| x.null_count() != 
0).unwrap_or_default() {
-            true => (r_len, Some(NullBuffer::new_null(r_len))),
-            false => (r_len, r_nulls), // Left is scalar and not null
-        },
-        (false, true) => match r_nulls.map(|x| x.null_count() != 
0).unwrap_or_default() {
-            true => (l_len, Some(NullBuffer::new_null(l_len))),
-            false => (l_len, l_nulls), // Right is scalar and not null
-        },
+    if l_len != r_len && !l_s && !r_s {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Cannot compare arrays of different lengths, got {l_len} vs 
{r_len}"
+        )));
+    }
+
+    let len = match l_s {
+        true => r_len,
+        false => l_len,
     };
 
+    let l_nulls = l.logical_nulls();
+    let r_nulls = r.logical_nulls();
+
     let l_v = l.as_any_dictionary_opt();
     let l = l_v.map(|x| x.values().as_ref()).unwrap_or(l);
+    let l_t = l.data_type();
 
     let r_v = r.as_any_dictionary_opt();
     let r = r_v.map(|x| x.values().as_ref()).unwrap_or(r);
+    let r_t = r.data_type();
+
+    if l_t != r_t || l_t.is_nested() {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Invalid comparison operation: {l_t} {op} {r_t}"
+        )));
+    }
+
+    // Defer computation as may not be necessary
+    let values = || -> BooleanBuffer {
+        let d = downcast_primitive_array! {
+            (l, r) => apply(op, l.values().as_ref(), l_s, l_v, 
r.values().as_ref(), r_s, r_v),
+            (Boolean, Boolean) => apply(op, l.as_boolean(), l_s, l_v, 
r.as_boolean(), r_s, r_v),
+            (Utf8, Utf8) => apply(op, l.as_string::<i32>(), l_s, l_v, 
r.as_string::<i32>(), r_s, r_v),
+            (LargeUtf8, LargeUtf8) => apply(op, l.as_string::<i64>(), l_s, 
l_v, r.as_string::<i64>(), r_s, r_v),
+            (Binary, Binary) => apply(op, l.as_binary::<i32>(), l_s, l_v, 
r.as_binary::<i32>(), r_s, r_v),
+            (LargeBinary, LargeBinary) => apply(op, l.as_binary::<i64>(), l_s, 
l_v, r.as_binary::<i64>(), r_s, r_v),
+            (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op, 
l.as_fixed_size_binary(), l_s, l_v, r.as_fixed_size_binary(), r_s, r_v),
+            (Null, Null) => None,
+            _ => unreachable!(),
+        };
+        d.unwrap_or_else(|| BooleanBuffer::new_unset(len))
+    };
 
-    let values = downcast_primitive_array! {
-        (l, r) => apply(op, l.values().as_ref(), l_s, l_v, 
r.values().as_ref(), r_s, r_v),
-        (Boolean, Boolean) => apply(op, l.as_boolean(), l_s, l_v, 
r.as_boolean(), r_s, r_v),
-        (Utf8, Utf8) => apply(op, l.as_string::<i32>(), l_s, l_v, 
r.as_string::<i32>(), r_s, r_v),
-        (LargeUtf8, LargeUtf8) => apply(op, l.as_string::<i64>(), l_s, l_v, 
r.as_string::<i64>(), r_s, r_v),
-        (Binary, Binary) => apply(op, l.as_binary::<i32>(), l_s, l_v, 
r.as_binary::<i32>(), r_s, r_v),
-        (LargeBinary, LargeBinary) => apply(op, l.as_binary::<i64>(), l_s, 
l_v, r.as_binary::<i64>(), r_s, r_v),
-        (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op, 
l.as_fixed_size_binary(), l_s, l_v, r.as_fixed_size_binary(), r_s, r_v),
-        (l_t, r_t) => return 
Err(ArrowError::InvalidArgumentError(format!("Invalid comparison operation: 
{l_t} {op} {r_t}"))),
-    }.unwrap_or_else(|| {
-        let count = nulls.as_ref().map(|x| x.null_count()).unwrap_or_default();
-        assert_eq!(count, len); // Sanity check
-        BooleanBuffer::new_unset(len)
-    });
-
-    assert_eq!(values.len(), len); // Sanity check
-    Ok(BooleanArray::new(values, nulls))
+    let l_nulls = l_nulls.filter(|n| n.null_count() > 0);
+    let r_nulls = r_nulls.filter(|n| n.null_count() > 0);
+    Ok(match (l_nulls, l_s, r_nulls, r_s) {
+        (Some(l), true, Some(r), true) | (Some(l), false, Some(r), false) => {
+            // Either both sides are scalar or neither side is scalar
+            match op {
+                Op::Distinct => {
+                    let values = values();
+                    let l = l.inner().bit_chunks().iter_padded();
+                    let r = r.inner().bit_chunks().iter_padded();
+                    let ne = values.bit_chunks().iter_padded();
+
+                    let c = |((l, r), n)| ((l ^ r) | (l & r & n));
+                    let buffer = l.zip(r).zip(ne).map(c).collect();
+                    BooleanBuffer::new(buffer, 0, len).into()
+                }
+                Op::NotDistinct => {
+                    let values = values();
+                    let l = l.inner().bit_chunks().iter_padded();
+                    let r = r.inner().bit_chunks().iter_padded();
+                    let e = values.bit_chunks().iter_padded();
+
+                    let c = |((l, r), e)| u64::not(l | r) | (l & r & e);
+                    let buffer = l.zip(r).zip(e).map(c).collect();
+                    BooleanBuffer::new(buffer, 0, len).into()
+                }
+                _ => BooleanArray::new(values(), NullBuffer::union(Some(&l), 
Some(&r))),
+            }
+        }
+        (Some(_), true, Some(a), false) | (Some(a), false, Some(_), true) => {
+            // Scalar is null, other side is non-scalar and nullable
+            match op {
+                Op::Distinct => a.into_inner().into(),
+                Op::NotDistinct => a.into_inner().not().into(),
+                _ => BooleanArray::new_null(len),
+            }
+        }
+        (Some(nulls), is_scalar, None, _) | (None, _, Some(nulls), is_scalar) 
=> {
+            // Only one side is nullable
+            match is_scalar {
+                true => match op {
+                    // Scalar is null, other side is not nullable
+                    Op::Distinct => BooleanBuffer::new_set(len).into(),
+                    Op::NotDistinct => BooleanBuffer::new_unset(len).into(),
+                    _ => BooleanArray::new_null(len),
+                },
+                false => match op {
+                    Op::Distinct => {
+                        let values = values();
+                        let l = nulls.inner().bit_chunks().iter_padded();
+                        let ne = values.bit_chunks().iter_padded();
+                        let c = |(l, n)| u64::not(l) | n;
+                        let buffer = l.zip(ne).map(c).collect();
+                        BooleanBuffer::new(buffer, 0, len).into()
+                    }
+                    Op::NotDistinct => (nulls.inner() & &values()).into(),
+                    _ => BooleanArray::new(values(), Some(nulls)),
+                },
+            }
+        }
+        // Neither side is nullable
+        (None, _, None, _) => BooleanArray::new(values(), None),
+    })
 }
 
 /// Perform a potentially vectored `op` on the provided `ArrayOrd`
@@ -215,8 +319,12 @@ fn apply<T: ArrayOrd>(
         assert_eq!(l_v.len(), r_v.len()); // Sanity check
 
         Some(match op {
-            Op::Equal => apply_op_vectored(l, &l_v, r, &r_v, false, T::is_eq),
-            Op::NotEqual => apply_op_vectored(l, &l_v, r, &r_v, true, 
T::is_eq),
+            Op::Equal | Op::NotDistinct => {
+                apply_op_vectored(l, &l_v, r, &r_v, false, T::is_eq)
+            }
+            Op::NotEqual | Op::Distinct => {
+                apply_op_vectored(l, &l_v, r, &r_v, true, T::is_eq)
+            }
             Op::Less => apply_op_vectored(l, &l_v, r, &r_v, false, T::is_lt),
             Op::LessEqual => apply_op_vectored(r, &r_v, l, &l_v, true, 
T::is_lt),
             Op::Greater => apply_op_vectored(r, &r_v, l, &l_v, false, 
T::is_lt),
@@ -227,8 +335,8 @@ fn apply<T: ArrayOrd>(
         let r_s = r_s.then(|| r_v.map(|x| 
x.normalized_keys()[0]).unwrap_or_default());
 
         let buffer = match op {
-            Op::Equal => apply_op(l, l_s, r, r_s, false, T::is_eq),
-            Op::NotEqual => apply_op(l, l_s, r, r_s, true, T::is_eq),
+            Op::Equal | Op::NotDistinct => apply_op(l, l_s, r, r_s, false, 
T::is_eq),
+            Op::NotEqual | Op::Distinct => apply_op(l, l_s, r, r_s, true, 
T::is_eq),
             Op::Less => apply_op(l, l_s, r, r_s, false, T::is_lt),
             Op::LessEqual => apply_op(r, r_s, l, l_s, true, T::is_lt),
             Op::Greater => apply_op(r, r_s, l, l_s, false, T::is_lt),
@@ -293,6 +401,8 @@ fn collect_bool(len: usize, neg: bool, f: impl Fn(usize) -> 
bool) -> BooleanBuff
 ///
 /// If l is scalar `l_s` will be `Some(idx)` where `idx` is the index of the 
scalar value in `l`
 /// If r is scalar `r_s` will be `Some(idx)` where `idx` is the index of the 
scalar value in `r`
+///
+/// If `neg` is true the result of `op` will be negated
 fn apply_op<T: ArrayOrd>(
     l: T,
     l_s: Option<usize>,
@@ -311,7 +421,7 @@ fn apply_op<T: ArrayOrd>(
         (Some(l_s), Some(r_s)) => {
             let a = l.value(l_s);
             let b = r.value(r_s);
-            std::iter::once(op(a, b)).collect()
+            std::iter::once(op(a, b) ^ neg).collect()
         }
         (Some(l_s), None) => {
             let v = l.value(l_s);
@@ -486,4 +596,116 @@ mod tests {
         let r = eq(&a, &Scalar::new(&scalar)).unwrap();
         assert_eq!(r.null_count(), 3);
     }
+
+    #[test]
+    fn is_distinct_from_non_nulls() {
+        let left_int_array = Int32Array::from(vec![0, 1, 2, 3, 4]);
+        let right_int_array = Int32Array::from(vec![4, 3, 2, 1, 0]);
+
+        assert_eq!(
+            BooleanArray::from(vec![true, true, false, true, true,]),
+            distinct(&left_int_array, &right_int_array).unwrap()
+        );
+        assert_eq!(
+            BooleanArray::from(vec![false, false, true, false, false,]),
+            not_distinct(&left_int_array, &right_int_array).unwrap()
+        );
+    }
+
+    #[test]
+    fn is_distinct_from_nulls() {
+        // [0, 0, NULL, 0, 0, 0]
+        let left_int_array = Int32Array::new(
+            vec![0, 0, 1, 3, 0, 0].into(),
+            Some(NullBuffer::from(vec![true, true, false, true, true, true])),
+        );
+        // [0, NULL, NULL, NULL, 0, NULL]
+        let right_int_array = Int32Array::new(
+            vec![0; 6].into(),
+            Some(NullBuffer::from(vec![
+                true, false, false, false, true, false,
+            ])),
+        );
+
+        assert_eq!(
+            BooleanArray::from(vec![false, true, false, true, false, true,]),
+            distinct(&left_int_array, &right_int_array).unwrap()
+        );
+
+        assert_eq!(
+            BooleanArray::from(vec![true, false, true, false, true, false,]),
+            not_distinct(&left_int_array, &right_int_array).unwrap()
+        );
+    }
+
+    #[test]
+    fn test_distinct_scalar() {
+        let a = Int32Array::new_scalar(12);
+        let b = Int32Array::new_scalar(12);
+        assert!(!distinct(&a, &b).unwrap().value(0));
+        assert!(not_distinct(&a, &b).unwrap().value(0));
+
+        let a = Int32Array::new_scalar(12);
+        let b = Int32Array::new_null(1);
+        assert!(distinct(&a, &b).unwrap().value(0));
+        assert!(!not_distinct(&a, &b).unwrap().value(0));
+        assert!(distinct(&b, &a).unwrap().value(0));
+        assert!(!not_distinct(&b, &a).unwrap().value(0));
+
+        let b = Scalar::new(b);
+        assert!(distinct(&a, &b).unwrap().value(0));
+        assert!(!not_distinct(&a, &b).unwrap().value(0));
+
+        assert!(!distinct(&b, &b).unwrap().value(0));
+        assert!(not_distinct(&b, &b).unwrap().value(0));
+
+        let a = Int32Array::new(
+            vec![0, 1, 2, 3].into(),
+            Some(vec![false, false, true, true].into()),
+        );
+        let expected = BooleanArray::from(vec![false, false, true, true]);
+        assert_eq!(distinct(&a, &b).unwrap(), expected);
+        assert_eq!(distinct(&b, &a).unwrap(), expected);
+
+        let expected = BooleanArray::from(vec![true, true, false, false]);
+        assert_eq!(not_distinct(&a, &b).unwrap(), expected);
+        assert_eq!(not_distinct(&b, &a).unwrap(), expected);
+
+        let b = Int32Array::new_scalar(1);
+        let expected = BooleanArray::from(vec![true; 4]);
+        assert_eq!(distinct(&a, &b).unwrap(), expected);
+        assert_eq!(distinct(&b, &a).unwrap(), expected);
+        let expected = BooleanArray::from(vec![false; 4]);
+        assert_eq!(not_distinct(&a, &b).unwrap(), expected);
+        assert_eq!(not_distinct(&b, &a).unwrap(), expected);
+
+        let b = Int32Array::new_scalar(3);
+        let expected = BooleanArray::from(vec![true, true, true, false]);
+        assert_eq!(distinct(&a, &b).unwrap(), expected);
+        assert_eq!(distinct(&b, &a).unwrap(), expected);
+        let expected = BooleanArray::from(vec![false, false, false, true]);
+        assert_eq!(not_distinct(&a, &b).unwrap(), expected);
+        assert_eq!(not_distinct(&b, &a).unwrap(), expected);
+    }
+
+    #[test]
+    fn test_scalar_negation() {
+        let a = Int32Array::new_scalar(54);
+        let b = Int32Array::new_scalar(54);
+        let r = eq(&a, &b).unwrap();
+        assert!(r.value(0));
+
+        let r = neq(&a, &b).unwrap();
+        assert!(!r.value(0))
+    }
+
+    #[test]
+    fn test_scalar_empty() {
+        let a = Int32Array::new_null(0);
+        let b = Int32Array::new_scalar(23);
+        let r = eq(&a, &b).unwrap();
+        assert_eq!(r.len(), 0);
+        let r = eq(&b, &a).unwrap();
+        assert_eq!(r.len(), 0);
+    }
 }
diff --git a/arrow-ord/src/partition.rs b/arrow-ord/src/partition.rs
index 52aa5ee8d0..0b8447989b 100644
--- a/arrow-ord/src/partition.rs
+++ b/arrow-ord/src/partition.rs
@@ -23,7 +23,7 @@ use arrow_array::{Array, ArrayRef};
 use arrow_buffer::BooleanBuffer;
 use arrow_schema::ArrowError;
 
-use crate::cmp::neq;
+use crate::cmp::distinct;
 use crate::sort::SortColumn;
 
 /// A computed set of partitions, see [`partition`]
@@ -157,23 +157,7 @@ fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, 
ArrowError> {
     let slice_len = v.len() - 1;
     let v1 = v.slice(0, slice_len);
     let v2 = v.slice(1, slice_len);
-
-    let array_ne = neq(&v1, &v2)?;
-    // Set if values have different non-NULL values
-    let values_ne = match array_ne.nulls().filter(|n| n.null_count() > 0) {
-        Some(n) => n.inner() & array_ne.values(),
-        None => array_ne.values().clone(),
-    };
-
-    Ok(match v.nulls().filter(|x| x.null_count() > 0) {
-        Some(n) => {
-            let n1 = n.inner().slice(0, slice_len);
-            let n2 = n.inner().slice(1, slice_len);
-            // Set if values_ne or the nullability has changed
-            &(&n1 ^ &n2) | &values_ne
-        }
-        None => values_ne,
-    })
+    Ok(distinct(&v1, &v2)?.values().clone())
 }
 
 /// Given a list of already sorted columns, find partition ranges that would 
partition

Reply via email to