This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new bce0b418b6 Add distinct kernels (#960) (#4438) (#4716)
bce0b418b6 is described below
commit bce0b418b69dcf0dab1fca3edbe5db1f5ca122a2
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Aug 21 15:25:26 2023 +0100
Add distinct kernels (#960) (#4438) (#4716)
* Add distinct kernels (#960) (#4438)
* Fixes
* Add tests
* Handle NullArray
* Fix comparisons between scalar and empty array
* Clippy
* Review feedback
---
arrow-array/src/array/boolean_array.rs | 9 +
arrow-ord/src/cmp.rs | 304 ++++++++++++++++++++++++++++-----
arrow-ord/src/partition.rs | 20 +--
3 files changed, 274 insertions(+), 59 deletions(-)
diff --git a/arrow-array/src/array/boolean_array.rs
b/arrow-array/src/array/boolean_array.rs
index 0d9a1044be..4d19babe3e 100644
--- a/arrow-array/src/array/boolean_array.rs
+++ b/arrow-array/src/array/boolean_array.rs
@@ -437,6 +437,15 @@ impl<Ptr: std::borrow::Borrow<Option<bool>>>
FromIterator<Ptr> for BooleanArray
}
}
+impl From<BooleanBuffer> for BooleanArray {
+ fn from(values: BooleanBuffer) -> Self {
+ Self {
+ values,
+ nulls: None,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/arrow-ord/src/cmp.rs b/arrow-ord/src/cmp.rs
index aad61fa8f0..96f5aafd86 100644
--- a/arrow-ord/src/cmp.rs
+++ b/arrow-ord/src/cmp.rs
@@ -33,6 +33,7 @@ use arrow_buffer::bit_util::ceil;
use arrow_buffer::{BooleanBuffer, MutableBuffer, NullBuffer};
use arrow_schema::ArrowError;
use arrow_select::take::take;
+use std::ops::Not;
#[derive(Debug, Copy, Clone)]
enum Op {
@@ -42,6 +43,8 @@ enum Op {
LessEqual,
Greater,
GreaterEqual,
+ Distinct,
+ NotDistinct,
}
impl std::fmt::Display for Op {
@@ -53,6 +56,8 @@ impl std::fmt::Display for Op {
Op::LessEqual => write!(f, "<="),
Op::Greater => write!(f, ">"),
Op::GreaterEqual => write!(f, ">="),
+ Op::Distinct => write!(f, "IS DISTINCT FROM"),
+ Op::NotDistinct => write!(f, "IS NOT DISTINCT FROM"),
}
}
}
@@ -129,7 +134,43 @@ pub fn gt_eq(lhs: &dyn Datum, rhs: &dyn Datum) ->
Result<BooleanArray, ArrowErro
compare_op(Op::GreaterEqual, lhs, rhs)
}
+/// Perform `left IS DISTINCT FROM right` operation on two [`Datum`]
+///
+/// [`distinct`] is similar to [`neq`], only differing in null handling. In
particular, two
+/// operands are considered DISTINCT if they have a different value or if one
of them is NULL
+/// and the other isn't. The result of [`distinct`] is never NULL.
+///
+/// For floating values like f32 and f64, this comparison produces an ordering
in accordance to
+/// the totalOrder predicate as defined in the IEEE 754 (2008 revision)
floating point standard.
+/// Note that totalOrder treats positive and negative zeros as different. If
it is necessary
+/// to treat them as equal, please normalize zeros before calling this kernel.
+///
+/// Please refer to [`f32::total_cmp`] and [`f64::total_cmp`]
+pub fn distinct(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray,
ArrowError> {
+ compare_op(Op::Distinct, lhs, rhs)
+}
+
+/// Perform `left IS NOT DISTINCT FROM right` operation on two [`Datum`]
+///
+/// [`not_distinct`] is similar to [`eq`], only differing in null handling. In
particular, two
+/// operands are considered `NOT DISTINCT` if they have the same value or if
both of them
+/// is NULL. The result of [`not_distinct`] is never NULL.
+///
+/// For floating values like f32 and f64, this comparison produces an ordering
in accordance to
+/// the totalOrder predicate as defined in the IEEE 754 (2008 revision)
floating point standard.
+/// Note that totalOrder treats positive and negative zeros as different. If
it is necessary
+/// to treat them as equal, please normalize zeros before calling this kernel.
+///
+/// Please refer to [`f32::total_cmp`] and [`f64::total_cmp`]
+pub fn not_distinct(
+ lhs: &dyn Datum,
+ rhs: &dyn Datum,
+) -> Result<BooleanArray, ArrowError> {
+ compare_op(Op::NotDistinct, lhs, rhs)
+}
+
/// Perform `op` on the provided `Datum`
+#[inline(never)]
fn compare_op(
op: Op,
lhs: &dyn Datum,
@@ -141,51 +182,114 @@ fn compare_op(
let l_len = l.len();
let r_len = r.len();
- let l_nulls = l.logical_nulls();
- let r_nulls = r.logical_nulls();
- let (len, nulls) = match (l_s, r_s) {
- (true, true) | (false, false) => {
- if l_len != r_len {
- return Err(ArrowError::InvalidArgumentError(format!(
- "Cannot compare arrays of different lengths, got {l_len}
vs {r_len}"
- )));
- }
- (l_len, NullBuffer::union(l_nulls.as_ref(), r_nulls.as_ref()))
- }
- (true, false) => match l_nulls.map(|x| x.null_count() !=
0).unwrap_or_default() {
- true => (r_len, Some(NullBuffer::new_null(r_len))),
- false => (r_len, r_nulls), // Left is scalar and not null
- },
- (false, true) => match r_nulls.map(|x| x.null_count() !=
0).unwrap_or_default() {
- true => (l_len, Some(NullBuffer::new_null(l_len))),
- false => (l_len, l_nulls), // Right is scalar and not null
- },
+ if l_len != r_len && !l_s && !r_s {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Cannot compare arrays of different lengths, got {l_len} vs
{r_len}"
+ )));
+ }
+
+ let len = match l_s {
+ true => r_len,
+ false => l_len,
};
+ let l_nulls = l.logical_nulls();
+ let r_nulls = r.logical_nulls();
+
let l_v = l.as_any_dictionary_opt();
let l = l_v.map(|x| x.values().as_ref()).unwrap_or(l);
+ let l_t = l.data_type();
let r_v = r.as_any_dictionary_opt();
let r = r_v.map(|x| x.values().as_ref()).unwrap_or(r);
+ let r_t = r.data_type();
+
+ if l_t != r_t || l_t.is_nested() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Invalid comparison operation: {l_t} {op} {r_t}"
+ )));
+ }
+
+ // Defer computation as may not be necessary
+ let values = || -> BooleanBuffer {
+ let d = downcast_primitive_array! {
+ (l, r) => apply(op, l.values().as_ref(), l_s, l_v,
r.values().as_ref(), r_s, r_v),
+ (Boolean, Boolean) => apply(op, l.as_boolean(), l_s, l_v,
r.as_boolean(), r_s, r_v),
+ (Utf8, Utf8) => apply(op, l.as_string::<i32>(), l_s, l_v,
r.as_string::<i32>(), r_s, r_v),
+ (LargeUtf8, LargeUtf8) => apply(op, l.as_string::<i64>(), l_s,
l_v, r.as_string::<i64>(), r_s, r_v),
+ (Binary, Binary) => apply(op, l.as_binary::<i32>(), l_s, l_v,
r.as_binary::<i32>(), r_s, r_v),
+ (LargeBinary, LargeBinary) => apply(op, l.as_binary::<i64>(), l_s,
l_v, r.as_binary::<i64>(), r_s, r_v),
+ (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op,
l.as_fixed_size_binary(), l_s, l_v, r.as_fixed_size_binary(), r_s, r_v),
+ (Null, Null) => None,
+ _ => unreachable!(),
+ };
+ d.unwrap_or_else(|| BooleanBuffer::new_unset(len))
+ };
- let values = downcast_primitive_array! {
- (l, r) => apply(op, l.values().as_ref(), l_s, l_v,
r.values().as_ref(), r_s, r_v),
- (Boolean, Boolean) => apply(op, l.as_boolean(), l_s, l_v,
r.as_boolean(), r_s, r_v),
- (Utf8, Utf8) => apply(op, l.as_string::<i32>(), l_s, l_v,
r.as_string::<i32>(), r_s, r_v),
- (LargeUtf8, LargeUtf8) => apply(op, l.as_string::<i64>(), l_s, l_v,
r.as_string::<i64>(), r_s, r_v),
- (Binary, Binary) => apply(op, l.as_binary::<i32>(), l_s, l_v,
r.as_binary::<i32>(), r_s, r_v),
- (LargeBinary, LargeBinary) => apply(op, l.as_binary::<i64>(), l_s,
l_v, r.as_binary::<i64>(), r_s, r_v),
- (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op,
l.as_fixed_size_binary(), l_s, l_v, r.as_fixed_size_binary(), r_s, r_v),
- (l_t, r_t) => return
Err(ArrowError::InvalidArgumentError(format!("Invalid comparison operation:
{l_t} {op} {r_t}"))),
- }.unwrap_or_else(|| {
- let count = nulls.as_ref().map(|x| x.null_count()).unwrap_or_default();
- assert_eq!(count, len); // Sanity check
- BooleanBuffer::new_unset(len)
- });
-
- assert_eq!(values.len(), len); // Sanity check
- Ok(BooleanArray::new(values, nulls))
+ let l_nulls = l_nulls.filter(|n| n.null_count() > 0);
+ let r_nulls = r_nulls.filter(|n| n.null_count() > 0);
+ Ok(match (l_nulls, l_s, r_nulls, r_s) {
+ (Some(l), true, Some(r), true) | (Some(l), false, Some(r), false) => {
+ // Either both sides are scalar or neither side is scalar
+ match op {
+ Op::Distinct => {
+ let values = values();
+ let l = l.inner().bit_chunks().iter_padded();
+ let r = r.inner().bit_chunks().iter_padded();
+ let ne = values.bit_chunks().iter_padded();
+
+ let c = |((l, r), n)| ((l ^ r) | (l & r & n));
+ let buffer = l.zip(r).zip(ne).map(c).collect();
+ BooleanBuffer::new(buffer, 0, len).into()
+ }
+ Op::NotDistinct => {
+ let values = values();
+ let l = l.inner().bit_chunks().iter_padded();
+ let r = r.inner().bit_chunks().iter_padded();
+ let e = values.bit_chunks().iter_padded();
+
+ let c = |((l, r), e)| u64::not(l | r) | (l & r & e);
+ let buffer = l.zip(r).zip(e).map(c).collect();
+ BooleanBuffer::new(buffer, 0, len).into()
+ }
+ _ => BooleanArray::new(values(), NullBuffer::union(Some(&l),
Some(&r))),
+ }
+ }
+ (Some(_), true, Some(a), false) | (Some(a), false, Some(_), true) => {
+ // Scalar is null, other side is non-scalar and nullable
+ match op {
+ Op::Distinct => a.into_inner().into(),
+ Op::NotDistinct => a.into_inner().not().into(),
+ _ => BooleanArray::new_null(len),
+ }
+ }
+ (Some(nulls), is_scalar, None, _) | (None, _, Some(nulls), is_scalar)
=> {
+ // Only one side is nullable
+ match is_scalar {
+ true => match op {
+ // Scalar is null, other side is not nullable
+ Op::Distinct => BooleanBuffer::new_set(len).into(),
+ Op::NotDistinct => BooleanBuffer::new_unset(len).into(),
+ _ => BooleanArray::new_null(len),
+ },
+ false => match op {
+ Op::Distinct => {
+ let values = values();
+ let l = nulls.inner().bit_chunks().iter_padded();
+ let ne = values.bit_chunks().iter_padded();
+ let c = |(l, n)| u64::not(l) | n;
+ let buffer = l.zip(ne).map(c).collect();
+ BooleanBuffer::new(buffer, 0, len).into()
+ }
+ Op::NotDistinct => (nulls.inner() & &values()).into(),
+ _ => BooleanArray::new(values(), Some(nulls)),
+ },
+ }
+ }
+ // Neither side is nullable
+ (None, _, None, _) => BooleanArray::new(values(), None),
+ })
}
/// Perform a potentially vectored `op` on the provided `ArrayOrd`
@@ -215,8 +319,12 @@ fn apply<T: ArrayOrd>(
assert_eq!(l_v.len(), r_v.len()); // Sanity check
Some(match op {
- Op::Equal => apply_op_vectored(l, &l_v, r, &r_v, false, T::is_eq),
- Op::NotEqual => apply_op_vectored(l, &l_v, r, &r_v, true,
T::is_eq),
+ Op::Equal | Op::NotDistinct => {
+ apply_op_vectored(l, &l_v, r, &r_v, false, T::is_eq)
+ }
+ Op::NotEqual | Op::Distinct => {
+ apply_op_vectored(l, &l_v, r, &r_v, true, T::is_eq)
+ }
Op::Less => apply_op_vectored(l, &l_v, r, &r_v, false, T::is_lt),
Op::LessEqual => apply_op_vectored(r, &r_v, l, &l_v, true,
T::is_lt),
Op::Greater => apply_op_vectored(r, &r_v, l, &l_v, false,
T::is_lt),
@@ -227,8 +335,8 @@ fn apply<T: ArrayOrd>(
let r_s = r_s.then(|| r_v.map(|x|
x.normalized_keys()[0]).unwrap_or_default());
let buffer = match op {
- Op::Equal => apply_op(l, l_s, r, r_s, false, T::is_eq),
- Op::NotEqual => apply_op(l, l_s, r, r_s, true, T::is_eq),
+ Op::Equal | Op::NotDistinct => apply_op(l, l_s, r, r_s, false,
T::is_eq),
+ Op::NotEqual | Op::Distinct => apply_op(l, l_s, r, r_s, true,
T::is_eq),
Op::Less => apply_op(l, l_s, r, r_s, false, T::is_lt),
Op::LessEqual => apply_op(r, r_s, l, l_s, true, T::is_lt),
Op::Greater => apply_op(r, r_s, l, l_s, false, T::is_lt),
@@ -293,6 +401,8 @@ fn collect_bool(len: usize, neg: bool, f: impl Fn(usize) ->
bool) -> BooleanBuff
///
/// If l is scalar `l_s` will be `Some(idx)` where `idx` is the index of the
scalar value in `l`
/// If r is scalar `r_s` will be `Some(idx)` where `idx` is the index of the
scalar value in `r`
+///
+/// If `neg` is true the result of `op` will be negated
fn apply_op<T: ArrayOrd>(
l: T,
l_s: Option<usize>,
@@ -311,7 +421,7 @@ fn apply_op<T: ArrayOrd>(
(Some(l_s), Some(r_s)) => {
let a = l.value(l_s);
let b = r.value(r_s);
- std::iter::once(op(a, b)).collect()
+ std::iter::once(op(a, b) ^ neg).collect()
}
(Some(l_s), None) => {
let v = l.value(l_s);
@@ -486,4 +596,116 @@ mod tests {
let r = eq(&a, &Scalar::new(&scalar)).unwrap();
assert_eq!(r.null_count(), 3);
}
+
+ #[test]
+ fn is_distinct_from_non_nulls() {
+ let left_int_array = Int32Array::from(vec![0, 1, 2, 3, 4]);
+ let right_int_array = Int32Array::from(vec![4, 3, 2, 1, 0]);
+
+ assert_eq!(
+ BooleanArray::from(vec![true, true, false, true, true,]),
+ distinct(&left_int_array, &right_int_array).unwrap()
+ );
+ assert_eq!(
+ BooleanArray::from(vec![false, false, true, false, false,]),
+ not_distinct(&left_int_array, &right_int_array).unwrap()
+ );
+ }
+
+ #[test]
+ fn is_distinct_from_nulls() {
+ // [0, 0, NULL, 0, 0, 0]
+ let left_int_array = Int32Array::new(
+ vec![0, 0, 1, 3, 0, 0].into(),
+ Some(NullBuffer::from(vec![true, true, false, true, true, true])),
+ );
+ // [0, NULL, NULL, NULL, 0, NULL]
+ let right_int_array = Int32Array::new(
+ vec![0; 6].into(),
+ Some(NullBuffer::from(vec![
+ true, false, false, false, true, false,
+ ])),
+ );
+
+ assert_eq!(
+ BooleanArray::from(vec![false, true, false, true, false, true,]),
+ distinct(&left_int_array, &right_int_array).unwrap()
+ );
+
+ assert_eq!(
+ BooleanArray::from(vec![true, false, true, false, true, false,]),
+ not_distinct(&left_int_array, &right_int_array).unwrap()
+ );
+ }
+
+ #[test]
+ fn test_distinct_scalar() {
+ let a = Int32Array::new_scalar(12);
+ let b = Int32Array::new_scalar(12);
+ assert!(!distinct(&a, &b).unwrap().value(0));
+ assert!(not_distinct(&a, &b).unwrap().value(0));
+
+ let a = Int32Array::new_scalar(12);
+ let b = Int32Array::new_null(1);
+ assert!(distinct(&a, &b).unwrap().value(0));
+ assert!(!not_distinct(&a, &b).unwrap().value(0));
+ assert!(distinct(&b, &a).unwrap().value(0));
+ assert!(!not_distinct(&b, &a).unwrap().value(0));
+
+ let b = Scalar::new(b);
+ assert!(distinct(&a, &b).unwrap().value(0));
+ assert!(!not_distinct(&a, &b).unwrap().value(0));
+
+ assert!(!distinct(&b, &b).unwrap().value(0));
+ assert!(not_distinct(&b, &b).unwrap().value(0));
+
+ let a = Int32Array::new(
+ vec![0, 1, 2, 3].into(),
+ Some(vec![false, false, true, true].into()),
+ );
+ let expected = BooleanArray::from(vec![false, false, true, true]);
+ assert_eq!(distinct(&a, &b).unwrap(), expected);
+ assert_eq!(distinct(&b, &a).unwrap(), expected);
+
+ let expected = BooleanArray::from(vec![true, true, false, false]);
+ assert_eq!(not_distinct(&a, &b).unwrap(), expected);
+ assert_eq!(not_distinct(&b, &a).unwrap(), expected);
+
+ let b = Int32Array::new_scalar(1);
+ let expected = BooleanArray::from(vec![true; 4]);
+ assert_eq!(distinct(&a, &b).unwrap(), expected);
+ assert_eq!(distinct(&b, &a).unwrap(), expected);
+ let expected = BooleanArray::from(vec![false; 4]);
+ assert_eq!(not_distinct(&a, &b).unwrap(), expected);
+ assert_eq!(not_distinct(&b, &a).unwrap(), expected);
+
+ let b = Int32Array::new_scalar(3);
+ let expected = BooleanArray::from(vec![true, true, true, false]);
+ assert_eq!(distinct(&a, &b).unwrap(), expected);
+ assert_eq!(distinct(&b, &a).unwrap(), expected);
+ let expected = BooleanArray::from(vec![false, false, false, true]);
+ assert_eq!(not_distinct(&a, &b).unwrap(), expected);
+ assert_eq!(not_distinct(&b, &a).unwrap(), expected);
+ }
+
+ #[test]
+ fn test_scalar_negation() {
+ let a = Int32Array::new_scalar(54);
+ let b = Int32Array::new_scalar(54);
+ let r = eq(&a, &b).unwrap();
+ assert!(r.value(0));
+
+ let r = neq(&a, &b).unwrap();
+ assert!(!r.value(0))
+ }
+
+ #[test]
+ fn test_scalar_empty() {
+ let a = Int32Array::new_null(0);
+ let b = Int32Array::new_scalar(23);
+ let r = eq(&a, &b).unwrap();
+ assert_eq!(r.len(), 0);
+ let r = eq(&b, &a).unwrap();
+ assert_eq!(r.len(), 0);
+ }
}
diff --git a/arrow-ord/src/partition.rs b/arrow-ord/src/partition.rs
index 52aa5ee8d0..0b8447989b 100644
--- a/arrow-ord/src/partition.rs
+++ b/arrow-ord/src/partition.rs
@@ -23,7 +23,7 @@ use arrow_array::{Array, ArrayRef};
use arrow_buffer::BooleanBuffer;
use arrow_schema::ArrowError;
-use crate::cmp::neq;
+use crate::cmp::distinct;
use crate::sort::SortColumn;
/// A computed set of partitions, see [`partition`]
@@ -157,23 +157,7 @@ fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer,
ArrowError> {
let slice_len = v.len() - 1;
let v1 = v.slice(0, slice_len);
let v2 = v.slice(1, slice_len);
-
- let array_ne = neq(&v1, &v2)?;
- // Set if values have different non-NULL values
- let values_ne = match array_ne.nulls().filter(|n| n.null_count() > 0) {
- Some(n) => n.inner() & array_ne.values(),
- None => array_ne.values().clone(),
- };
-
- Ok(match v.nulls().filter(|x| x.null_count() > 0) {
- Some(n) => {
- let n1 = n.inner().slice(0, slice_len);
- let n2 = n.inner().slice(1, slice_len);
- // Set if values_ne or the nullability has changed
- &(&n1 ^ &n2) | &values_ne
- }
- None => values_ne,
- })
+ Ok(distinct(&v1, &v2)?.values().clone())
}
/// Given a list of already sorted columns, find partition ranges that would
partition