alamb commented on code in PR #4575:
URL: https://github.com/apache/arrow-rs/pull/4575#discussion_r1283652015


##########
arrow-ord/src/partition.rs:
##########
@@ -95,348 +124,209 @@ use std::ops::Range;
 ///
 /// assert_eq!(ranges, expected);
 /// ```
-pub fn lexicographical_partition_ranges(
-    columns: &[SortColumn],
-) -> Result<impl Iterator<Item = Range<usize>> + '_, ArrowError> {
-    LexicographicalPartitionIterator::try_new(columns)
-}
+pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
+    if columns.is_empty() {
+        return Err(ArrowError::InvalidArgumentError(
+            "Partition requires at least one column".to_string(),
+        ));
+    }
+    let num_rows = columns[0].len();
+    if columns.iter().any(|item| item.len() != num_rows) {
+        return Err(ArrowError::InvalidArgumentError(
+            "Partition columns have different row counts".to_string(),
+        ));
+    };
 
-struct LexicographicalPartitionIterator<'a> {
-    comparator: LexicographicalComparator<'a>,
-    num_rows: usize,
-    previous_partition_point: usize,
-    partition_point: usize,
-}
+    match num_rows {
+        0 => return Ok(Partitions(None)),
+        1 => return Ok(Partitions(Some(BooleanBuffer::new_unset(0)))),
+        _ => {}
+    }
 
-impl<'a> LexicographicalPartitionIterator<'a> {
-    fn try_new(
-        columns: &'a [SortColumn],
-    ) -> Result<LexicographicalPartitionIterator, ArrowError> {
-        if columns.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Sort requires at least one column".to_string(),
-            ));
-        }
-        let num_rows = columns[0].values.len();
-        if columns.iter().any(|item| item.values.len() != num_rows) {
-            return Err(ArrowError::ComputeError(
-                "Lexical sort columns have different row counts".to_string(),
-            ));
-        };
+    let acc = find_boundaries(&columns[0])?;
+    let acc = columns
+        .iter()
+        .skip(1)
+        .try_fold(acc, |acc, c| find_boundaries(c.as_ref()).map(|b| &acc | 
&b))?;
 
-        let comparator = LexicographicalComparator::try_new(columns)?;
-        Ok(LexicographicalPartitionIterator {
-            comparator,
-            num_rows,
-            previous_partition_point: 0,
-            partition_point: 0,
-        })
-    }
+    Ok(Partitions(Some(acc)))
 }
 
-/// Returns the next partition point of the range `start..end` according to 
the given comparator.
-/// The return value is the index of the first element of the second partition,
-/// and is guaranteed to be between `start..=end` (inclusive).
-///
-/// The values corresponding to those indices are assumed to be partitioned 
according to the given comparator.
-///
-/// Exponential search is to remedy for the case when array size and 
cardinality are both large.
-/// In these cases the partition point would be near the beginning of the 
range and
-/// plain binary search would be doing some unnecessary iterations on each 
call.
-///
-/// see <https://en.wikipedia.org/wiki/Exponential_search>
-#[inline]
-fn exponential_search_next_partition_point(
-    start: usize,
-    end: usize,
-    comparator: &LexicographicalComparator<'_>,
-) -> usize {
-    let target = start;
-    let mut bound = 1;
-    while bound + start < end
-        && comparator.compare(bound + start, target) != Ordering::Greater
-    {
-        bound *= 2;
-    }
+/// Returns a mask with bits set whenever the value or nullability changes
+fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
+    let slice_len = v.len() - 1;
+    let v1 = v.slice(0, slice_len);
+    let v2 = v.slice(1, slice_len);
 
-    // invariant after while loop:
-    // (start + bound / 2) <= target < min(end, start + bound + 1)
-    // where <= and < are defined by the comparator;
-    // note here we have right = min(end, start + bound + 1) because (start + 
bound) might
-    // actually be considered and must be included.
-    partition_point(start + bound / 2, end.min(start + bound + 1), |idx| {
-        comparator.compare(idx, target) != Ordering::Greater
-    })
-}
+    let array_ne = neq_dyn(v1.as_ref(), v2.as_ref())?;
+    // Set if values have different non-NULL values
+    let values_ne = match array_ne.nulls().filter(|n| n.null_count() > 0) {
+        Some(n) => n.inner() & array_ne.values(),
+        None => array_ne.values().clone(),
+    };
 
-/// Returns the partition point of the range `start..end` according to the 
given predicate.
-/// The return value is the index of the first element of the second partition,
-/// and is guaranteed to be between `start..=end` (inclusive).
-///
-/// The algorithm is similar to a binary search.
-///
-/// The values corresponding to those indices are assumed to be partitioned 
according to the given predicate.
-///
-/// See [`slice::partition_point`]
-#[inline]
-fn partition_point<P: Fn(usize) -> bool>(start: usize, end: usize, pred: P) -> 
usize {
-    let mut left = start;
-    let mut right = end;
-    let mut size = right - left;
-    while left < right {
-        let mid = left + size / 2;
-
-        let less = pred(mid);
-
-        if less {
-            left = mid + 1;
-        } else {
-            right = mid;
+    Ok(match v.nulls().filter(|x| x.null_count() > 0) {
+        Some(n) => {
+            let n1 = n.inner().slice(0, slice_len);
+            let n2 = n.inner().slice(1, slice_len);
+            // Set if values_ne or the nullability has changed
+            &(&n1 ^ &n2) | &values_ne
         }
-
-        size = right - left;
-    }
-    left
+        None => values_ne,
+    })
 }
 
-impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
-    type Item = Range<usize>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        if self.partition_point < self.num_rows {
-            // invariant:
-            // in the range [0..previous_partition_point] all values are <= 
the value at [previous_partition_point]
-            // so in order to save time we can do binary search on the range 
[previous_partition_point..num_rows]
-            // and find the index where any value is greater than the value at 
[previous_partition_point]
-            self.partition_point = exponential_search_next_partition_point(
-                self.partition_point,
-                self.num_rows,
-                &self.comparator,
-            );
-            let start = self.previous_partition_point;
-            let end = self.partition_point;
-            self.previous_partition_point = self.partition_point;
-            Some(Range { start, end })
-        } else {
-            None
-        }
-    }
+/// Given a list of already sorted columns, find partition ranges that would 
partition
+/// lexicographically equal values across columns.
+///
+/// The returned vec would be of size k where k is cardinality of the sorted 
values; Consecutive
+/// values will be connected: (a, b) and (b, c), where start = 0 and end = n 
for the first and last
+/// range.
+#[deprecated(note = "Use partition")]
+pub fn lexicographical_partition_ranges(
+    columns: &[SortColumn],
+) -> Result<impl Iterator<Item = Range<usize>> + '_, ArrowError> {
+    let cols: Vec<_> = columns.iter().map(|x| x.values.clone()).collect();
+    Ok(partition(&cols)?.ranges().into_iter())
 }
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-    use crate::sort::SortOptions;
+    use std::sync::Arc;
+
     use arrow_array::*;
     use arrow_schema::DataType;
-    use std::sync::Arc;
 
-    #[test]
-    fn test_partition_point() {
-        let input = &[1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 4];
-        {
-            let median = input[input.len() / 2];
-            assert_eq!(
-                9,
-                partition_point(0, input.len(), |i: usize| 
input[i].cmp(&median)
-                    != Ordering::Greater)
-            );
-        }
-        {
-            let search = input[9];
-            assert_eq!(
-                12,
-                partition_point(9, input.len(), |i: usize| 
input[i].cmp(&search)
-                    != Ordering::Greater)
-            );
-        }
-        {
-            let search = input[0];
-            assert_eq!(
-                3,
-                partition_point(0, 9, |i: usize| input[i].cmp(&search)
-                    != Ordering::Greater)
-            );
-        }
-        let input = &[1, 2, 2, 2, 2, 2, 2, 2, 9];
-        {
-            let search = input[5];
-            assert_eq!(
-                8,
-                partition_point(5, 9, |i: usize| input[i].cmp(&search)
-                    != Ordering::Greater)
-            );
-        }
-    }
+    use super::*;
 
     #[test]
-    fn test_lexicographical_partition_ranges_empty() {
-        let input = vec![];
-        assert!(
-            lexicographical_partition_ranges(&input).is_err(),
-            "lexicographical_partition_ranges should reject columns with empty 
rows"
+    fn test_partition_empty() {
+        let err = partition(&[]).unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Invalid argument error: Partition requires at least one column"
         );
     }
 
     #[test]
-    fn test_lexicographical_partition_ranges_unaligned_rows() {
+    fn test_partition_unaligned_rows() {
         let input = vec![
-            SortColumn {
-                values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as 
ArrayRef,
-                options: None,
-            },
-            SortColumn {
-                values: Arc::new(StringArray::from(vec![Some("foo")])) as 
ArrayRef,
-                options: None,
-            },
+            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
+            Arc::new(StringArray::from(vec![Some("foo")])) as _,
         ];
-        assert!(
-            lexicographical_partition_ranges(&input).is_err(),
-            "lexicographical_partition_ranges should reject columns with 
different row counts"
-        );
+        let err = partition(&input).unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Invalid argument error: Partition columns have different row 
counts"
+        )
     }
 
     #[test]
-    fn test_lexicographical_partition_single_column() {
-        let input = vec![SortColumn {
-            values: Arc::new(Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]))
-                as ArrayRef,
-            options: Some(SortOptions {
-                descending: false,
-                nulls_first: true,
-            }),
-        }];
-        let results = lexicographical_partition_ranges(&input).unwrap();
+    fn test_partition_small() {
+        let results = partition(&[
+            Arc::new(Int32Array::new(vec![].into(), None)) as _,
+            Arc::new(Int32Array::new(vec![].into(), None)) as _,
+            Arc::new(Int32Array::new(vec![].into(), None)) as _,
+        ])
+        .unwrap();
+        assert_eq!(results.len(), 0);
+        assert!(results.is_empty());
+
+        let results = partition(&[
+            Arc::new(Int32Array::from(vec![1])) as _,
+            Arc::new(Int32Array::from(vec![1])) as _,
+        ])
+        .unwrap();
+        assert_eq!(results.ranges(), &[0..1]);
+    }
+
+    #[test]
+    fn test_partition_single_column() {
+        let a = Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]);
+        let input = vec![Arc::new(a) as _];
         assert_eq!(
-            vec![(0_usize..1_usize), (1_usize..8_usize), (8_usize..9_usize)],
-            results.collect::<Vec<_>>()
+            partition(&input).unwrap().ranges(),
+            vec![(0..1), (1..8), (8..9)],
         );
     }
 
     #[test]
-    fn test_lexicographical_partition_all_equal_values() {
-        let input = vec![SortColumn {
-            values: Arc::new(Int64Array::from_value(1, 1000)) as ArrayRef,
-            options: Some(SortOptions {
-                descending: false,
-                nulls_first: true,
-            }),
-        }];
-
-        let results = lexicographical_partition_ranges(&input).unwrap();
-        assert_eq!(vec![(0_usize..1000_usize)], results.collect::<Vec<_>>());
+    fn test_partition_all_equal_values() {
+        let a = Int64Array::from_value(1, 1000);
+        let input = vec![Arc::new(a) as _];
+        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
+    }
+
+    #[test]
+    fn test_partition_all_null_values() {
+        let input = vec![
+            new_null_array(&DataType::Int8, 1000),
+            new_null_array(&DataType::UInt16, 1000),
+        ];
+        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
     }
 
     #[test]
-    fn test_lexicographical_partition_all_null_values() {
+    fn test_partition_unique_column_1() {
         let input = vec![
-            SortColumn {
-                values: new_null_array(&DataType::Int8, 1000),
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: true,
-                }),
-            },
-            SortColumn {
-                values: new_null_array(&DataType::UInt16, 1000),
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: false,
-                }),
-            },
+            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
+            Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as _,
         ];
-        let results = lexicographical_partition_ranges(&input).unwrap();
-        assert_eq!(vec![(0_usize..1000_usize)], results.collect::<Vec<_>>());
+        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1), (1..2)],);
     }
 
     #[test]
-    fn test_lexicographical_partition_unique_column_1() {
+    fn test_partition_unique_column_2() {
         let input = vec![
-            SortColumn {
-                values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as 
ArrayRef,
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: true,
-                }),
-            },
-            SortColumn {
-                values: Arc::new(StringArray::from(vec![Some("foo"), 
Some("bar")]))
-                    as ArrayRef,
-                options: Some(SortOptions {
-                    descending: true,
-                    nulls_first: true,
-                }),
-            },
+            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) as _,
+            Arc::new(StringArray::from(vec![
+                Some("foo"),
+                Some("bar"),
+                Some("apple"),
+            ])) as _,
         ];
-        let results = lexicographical_partition_ranges(&input).unwrap();
         assert_eq!(
-            vec![(0_usize..1_usize), (1_usize..2_usize)],
-            results.collect::<Vec<_>>()
+            partition(&input).unwrap().ranges(),
+            vec![(0..1), (1..2), (2..3),],
         );
     }
 
     #[test]
-    fn test_lexicographical_partition_unique_column_2() {
+    fn test_partition_non_unique_column_1() {
         let input = vec![
-            SortColumn {
-                values: Arc::new(Int64Array::from(vec![None, Some(-1), 
Some(-1)]))
-                    as ArrayRef,
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: true,
-                }),
-            },
-            SortColumn {
-                values: Arc::new(StringArray::from(vec![
-                    Some("foo"),
-                    Some("bar"),
-                    Some("apple"),
-                ])) as ArrayRef,
-                options: Some(SortOptions {
-                    descending: true,
-                    nulls_first: true,
-                }),
-            },
+            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1), 
Some(1)])) as _,
+            Arc::new(StringArray::from(vec![
+                Some("foo"),
+                Some("bar"),
+                Some("bar"),
+                Some("bar"),
+            ])) as _,
         ];
-        let results = lexicographical_partition_ranges(&input).unwrap();
         assert_eq!(
-            vec![(0_usize..1_usize), (1_usize..2_usize), (2_usize..3_usize),],
-            results.collect::<Vec<_>>()
+            partition(&input).unwrap().ranges(),
+            vec![(0..1), (1..3), (3..4),],
         );
     }
 
     #[test]
-    fn test_lexicographical_partition_non_unique_column_1() {
+    fn test_partition_masked_nulls() {
         let input = vec![
-            SortColumn {
-                values: Arc::new(Int64Array::from(vec![
-                    None,
-                    Some(-1),
-                    Some(-1),
-                    Some(1),
-                ])) as ArrayRef,
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: true,
-                }),
-            },
-            SortColumn {
-                values: Arc::new(StringArray::from(vec![
-                    Some("foo"),
-                    Some("bar"),
-                    Some("bar"),
-                    Some("bar"),
-                ])) as ArrayRef,
-                options: Some(SortOptions {
-                    descending: true,
-                    nulls_first: true,
-                }),
-            },
+            Arc::new(Int64Array::new(vec![1; 9].into(), None)) as _,
+            Arc::new(Int64Array::new(
+                vec![1, 1, 2, 2, 2, 3, 3, 3, 3].into(),

Review Comment:
   I think it would help to also have a test like this where the value in the 
null would actually be wrong / different
   
   ```suggestion
                   vec![1, 1, 2, 2, 2, 3, 0, 3, 3].into(),
   ```
   
   However I also made that change and it passed so 👍 



##########
arrow-ord/src/partition.rs:
##########
@@ -17,22 +17,64 @@
 
 //! Defines partition kernel for `ArrayRef`
 
-use crate::sort::{LexicographicalComparator, SortColumn};
-use arrow_schema::ArrowError;
-use std::cmp::Ordering;
 use std::ops::Range;
 
-/// Given a list of already sorted columns, returns [`Range`]es that
-/// partition the input such that each partition has equal values
-/// across sort columns.
+use arrow_array::{Array, ArrayRef};
+use arrow_buffer::BooleanBuffer;
+use arrow_schema::ArrowError;
+
+use crate::comparison::neq_dyn;
+use crate::sort::SortColumn;
+
+/// A computed set of partitions, see [`partition`]
+#[derive(Debug, Clone)]
+pub struct Partitions(Option<BooleanBuffer>);
+
+impl Partitions {
+    /// Returns the range of each partition
+    ///
+    /// Consecutive ranges will be contiguous: i.e [`(a, b)` and `(b, c)`], and
+    /// `start = 0` and `end = self.len()` for the first and last range 
respectively
+    pub fn ranges(&self) -> Vec<Range<usize>> {

Review Comment:
   One potential difference with this implementation that I realized compared 
to main is that it requires memory (a Vec) to store the partition ranges where 
the previous implementation just iterated over them. I think it would be 
possible to implement an iterator for this as well to avoid that regression. 
I'll see if I can make a PR to do so 



##########
arrow-ord/src/partition.rs:
##########
@@ -95,348 +124,209 @@ use std::ops::Range;
 ///
 /// assert_eq!(ranges, expected);
 /// ```
-pub fn lexicographical_partition_ranges(
-    columns: &[SortColumn],
-) -> Result<impl Iterator<Item = Range<usize>> + '_, ArrowError> {
-    LexicographicalPartitionIterator::try_new(columns)
-}
+pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
+    if columns.is_empty() {
+        return Err(ArrowError::InvalidArgumentError(
+            "Partition requires at least one column".to_string(),
+        ));
+    }
+    let num_rows = columns[0].len();
+    if columns.iter().any(|item| item.len() != num_rows) {
+        return Err(ArrowError::InvalidArgumentError(
+            "Partition columns have different row counts".to_string(),
+        ));
+    };
 
-struct LexicographicalPartitionIterator<'a> {
-    comparator: LexicographicalComparator<'a>,
-    num_rows: usize,
-    previous_partition_point: usize,
-    partition_point: usize,
-}
+    match num_rows {
+        0 => return Ok(Partitions(None)),
+        1 => return Ok(Partitions(Some(BooleanBuffer::new_unset(0)))),
+        _ => {}
+    }
 
-impl<'a> LexicographicalPartitionIterator<'a> {
-    fn try_new(
-        columns: &'a [SortColumn],
-    ) -> Result<LexicographicalPartitionIterator, ArrowError> {
-        if columns.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Sort requires at least one column".to_string(),
-            ));
-        }
-        let num_rows = columns[0].values.len();
-        if columns.iter().any(|item| item.values.len() != num_rows) {
-            return Err(ArrowError::ComputeError(
-                "Lexical sort columns have different row counts".to_string(),
-            ));
-        };
+    let acc = find_boundaries(&columns[0])?;
+    let acc = columns
+        .iter()
+        .skip(1)
+        .try_fold(acc, |acc, c| find_boundaries(c.as_ref()).map(|b| &acc | 
&b))?;
 
-        let comparator = LexicographicalComparator::try_new(columns)?;
-        Ok(LexicographicalPartitionIterator {
-            comparator,
-            num_rows,
-            previous_partition_point: 0,
-            partition_point: 0,
-        })
-    }
+    Ok(Partitions(Some(acc)))
 }
 
-/// Returns the next partition point of the range `start..end` according to 
the given comparator.
-/// The return value is the index of the first element of the second partition,
-/// and is guaranteed to be between `start..=end` (inclusive).
-///
-/// The values corresponding to those indices are assumed to be partitioned 
according to the given comparator.
-///
-/// Exponential search is to remedy for the case when array size and 
cardinality are both large.
-/// In these cases the partition point would be near the beginning of the 
range and
-/// plain binary search would be doing some unnecessary iterations on each 
call.
-///
-/// see <https://en.wikipedia.org/wiki/Exponential_search>
-#[inline]
-fn exponential_search_next_partition_point(
-    start: usize,
-    end: usize,
-    comparator: &LexicographicalComparator<'_>,
-) -> usize {
-    let target = start;
-    let mut bound = 1;
-    while bound + start < end
-        && comparator.compare(bound + start, target) != Ordering::Greater
-    {
-        bound *= 2;
-    }
+/// Returns a mask with bits set whenever the value or nullability changes
+fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
+    let slice_len = v.len() - 1;
+    let v1 = v.slice(0, slice_len);
+    let v2 = v.slice(1, slice_len);
 
-    // invariant after while loop:
-    // (start + bound / 2) <= target < min(end, start + bound + 1)
-    // where <= and < are defined by the comparator;
-    // note here we have right = min(end, start + bound + 1) because (start + 
bound) might
-    // actually be considered and must be included.
-    partition_point(start + bound / 2, end.min(start + bound + 1), |idx| {
-        comparator.compare(idx, target) != Ordering::Greater
-    })
-}
+    let array_ne = neq_dyn(v1.as_ref(), v2.as_ref())?;
+    // Set if values have different non-NULL values
+    let values_ne = match array_ne.nulls().filter(|n| n.null_count() > 0) {
+        Some(n) => n.inner() & array_ne.values(),
+        None => array_ne.values().clone(),
+    };
 
-/// Returns the partition point of the range `start..end` according to the 
given predicate.
-/// The return value is the index of the first element of the second partition,
-/// and is guaranteed to be between `start..=end` (inclusive).
-///
-/// The algorithm is similar to a binary search.
-///
-/// The values corresponding to those indices are assumed to be partitioned 
according to the given predicate.
-///
-/// See [`slice::partition_point`]
-#[inline]
-fn partition_point<P: Fn(usize) -> bool>(start: usize, end: usize, pred: P) -> 
usize {
-    let mut left = start;
-    let mut right = end;
-    let mut size = right - left;
-    while left < right {
-        let mid = left + size / 2;
-
-        let less = pred(mid);
-
-        if less {
-            left = mid + 1;
-        } else {
-            right = mid;
+    Ok(match v.nulls().filter(|x| x.null_count() > 0) {
+        Some(n) => {
+            let n1 = n.inner().slice(0, slice_len);
+            let n2 = n.inner().slice(1, slice_len);
+            // Set if values_ne or the nullability has changed
+            &(&n1 ^ &n2) | &values_ne
         }
-
-        size = right - left;
-    }
-    left
+        None => values_ne,
+    })
 }
 
-impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
-    type Item = Range<usize>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        if self.partition_point < self.num_rows {
-            // invariant:
-            // in the range [0..previous_partition_point] all values are <= 
the value at [previous_partition_point]
-            // so in order to save time we can do binary search on the range 
[previous_partition_point..num_rows]
-            // and find the index where any value is greater than the value at 
[previous_partition_point]
-            self.partition_point = exponential_search_next_partition_point(
-                self.partition_point,
-                self.num_rows,
-                &self.comparator,
-            );
-            let start = self.previous_partition_point;
-            let end = self.partition_point;
-            self.previous_partition_point = self.partition_point;
-            Some(Range { start, end })
-        } else {
-            None
-        }
-    }
+/// Given a list of already sorted columns, find partition ranges that would 
partition
+/// lexicographically equal values across columns.
+///
+/// The returned vec would be of size k where k is cardinality of the sorted 
values; Consecutive
+/// values will be connected: (a, b) and (b, c), where start = 0 and end = n 
for the first and last
+/// range.
+#[deprecated(note = "Use partition")]
+pub fn lexicographical_partition_ranges(
+    columns: &[SortColumn],
+) -> Result<impl Iterator<Item = Range<usize>> + '_, ArrowError> {
+    let cols: Vec<_> = columns.iter().map(|x| x.values.clone()).collect();
+    Ok(partition(&cols)?.ranges().into_iter())
 }
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-    use crate::sort::SortOptions;
+    use std::sync::Arc;
+
     use arrow_array::*;
     use arrow_schema::DataType;
-    use std::sync::Arc;
 
-    #[test]
-    fn test_partition_point() {
-        let input = &[1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 4];
-        {
-            let median = input[input.len() / 2];
-            assert_eq!(
-                9,
-                partition_point(0, input.len(), |i: usize| 
input[i].cmp(&median)
-                    != Ordering::Greater)
-            );
-        }
-        {
-            let search = input[9];
-            assert_eq!(
-                12,
-                partition_point(9, input.len(), |i: usize| 
input[i].cmp(&search)
-                    != Ordering::Greater)
-            );
-        }
-        {
-            let search = input[0];
-            assert_eq!(
-                3,
-                partition_point(0, 9, |i: usize| input[i].cmp(&search)
-                    != Ordering::Greater)
-            );
-        }
-        let input = &[1, 2, 2, 2, 2, 2, 2, 2, 9];
-        {
-            let search = input[5];
-            assert_eq!(
-                8,
-                partition_point(5, 9, |i: usize| input[i].cmp(&search)
-                    != Ordering::Greater)
-            );
-        }
-    }
+    use super::*;
 
     #[test]
-    fn test_lexicographical_partition_ranges_empty() {
-        let input = vec![];
-        assert!(
-            lexicographical_partition_ranges(&input).is_err(),
-            "lexicographical_partition_ranges should reject columns with empty 
rows"
+    fn test_partition_empty() {
+        let err = partition(&[]).unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Invalid argument error: Partition requires at least one column"
         );
     }
 
     #[test]
-    fn test_lexicographical_partition_ranges_unaligned_rows() {
+    fn test_partition_unaligned_rows() {
         let input = vec![
-            SortColumn {
-                values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as 
ArrayRef,
-                options: None,
-            },
-            SortColumn {
-                values: Arc::new(StringArray::from(vec![Some("foo")])) as 
ArrayRef,
-                options: None,
-            },
+            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
+            Arc::new(StringArray::from(vec![Some("foo")])) as _,
         ];
-        assert!(
-            lexicographical_partition_ranges(&input).is_err(),
-            "lexicographical_partition_ranges should reject columns with 
different row counts"
-        );
+        let err = partition(&input).unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Invalid argument error: Partition columns have different row 
counts"
+        )
     }
 
     #[test]
-    fn test_lexicographical_partition_single_column() {
-        let input = vec![SortColumn {
-            values: Arc::new(Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]))
-                as ArrayRef,
-            options: Some(SortOptions {
-                descending: false,
-                nulls_first: true,
-            }),
-        }];
-        let results = lexicographical_partition_ranges(&input).unwrap();
+    fn test_partition_small() {
+        let results = partition(&[
+            Arc::new(Int32Array::new(vec![].into(), None)) as _,
+            Arc::new(Int32Array::new(vec![].into(), None)) as _,
+            Arc::new(Int32Array::new(vec![].into(), None)) as _,
+        ])
+        .unwrap();
+        assert_eq!(results.len(), 0);
+        assert!(results.is_empty());
+
+        let results = partition(&[
+            Arc::new(Int32Array::from(vec![1])) as _,
+            Arc::new(Int32Array::from(vec![1])) as _,
+        ])
+        .unwrap();
+        assert_eq!(results.ranges(), &[0..1]);
+    }
+
+    #[test]
+    fn test_partition_single_column() {
+        let a = Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]);
+        let input = vec![Arc::new(a) as _];
         assert_eq!(
-            vec![(0_usize..1_usize), (1_usize..8_usize), (8_usize..9_usize)],
-            results.collect::<Vec<_>>()
+            partition(&input).unwrap().ranges(),
+            vec![(0..1), (1..8), (8..9)],
         );
     }
 
     #[test]
-    fn test_lexicographical_partition_all_equal_values() {
-        let input = vec![SortColumn {
-            values: Arc::new(Int64Array::from_value(1, 1000)) as ArrayRef,
-            options: Some(SortOptions {
-                descending: false,
-                nulls_first: true,
-            }),
-        }];
-
-        let results = lexicographical_partition_ranges(&input).unwrap();
-        assert_eq!(vec![(0_usize..1000_usize)], results.collect::<Vec<_>>());
+    fn test_partition_all_equal_values() {
+        let a = Int64Array::from_value(1, 1000);
+        let input = vec![Arc::new(a) as _];
+        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
+    }
+
+    #[test]
+    fn test_partition_all_null_values() {
+        let input = vec![
+            new_null_array(&DataType::Int8, 1000),
+            new_null_array(&DataType::UInt16, 1000),
+        ];
+        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
     }
 
     #[test]
-    fn test_lexicographical_partition_all_null_values() {
+    fn test_partition_unique_column_1() {
         let input = vec![
-            SortColumn {
-                values: new_null_array(&DataType::Int8, 1000),
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: true,
-                }),
-            },
-            SortColumn {
-                values: new_null_array(&DataType::UInt16, 1000),
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: false,
-                }),
-            },
+            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
+            Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as _,
         ];
-        let results = lexicographical_partition_ranges(&input).unwrap();
-        assert_eq!(vec![(0_usize..1000_usize)], results.collect::<Vec<_>>());
+        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1), (1..2)],);
     }
 
     #[test]
-    fn test_lexicographical_partition_unique_column_1() {
+    fn test_partition_unique_column_2() {
         let input = vec![
-            SortColumn {
-                values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as 
ArrayRef,
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: true,
-                }),
-            },
-            SortColumn {
-                values: Arc::new(StringArray::from(vec![Some("foo"), 
Some("bar")]))
-                    as ArrayRef,
-                options: Some(SortOptions {
-                    descending: true,
-                    nulls_first: true,
-                }),
-            },
+            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) as _,
+            Arc::new(StringArray::from(vec![
+                Some("foo"),
+                Some("bar"),
+                Some("apple"),
+            ])) as _,
         ];
-        let results = lexicographical_partition_ranges(&input).unwrap();
         assert_eq!(
-            vec![(0_usize..1_usize), (1_usize..2_usize)],
-            results.collect::<Vec<_>>()
+            partition(&input).unwrap().ranges(),
+            vec![(0..1), (1..2), (2..3),],
         );
     }
 
     #[test]
-    fn test_lexicographical_partition_unique_column_2() {
+    fn test_partition_non_unique_column_1() {
         let input = vec![
-            SortColumn {
-                values: Arc::new(Int64Array::from(vec![None, Some(-1), 
Some(-1)]))
-                    as ArrayRef,
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: true,
-                }),
-            },
-            SortColumn {
-                values: Arc::new(StringArray::from(vec![
-                    Some("foo"),
-                    Some("bar"),
-                    Some("apple"),
-                ])) as ArrayRef,
-                options: Some(SortOptions {
-                    descending: true,
-                    nulls_first: true,
-                }),
-            },
+            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1), 
Some(1)])) as _,
+            Arc::new(StringArray::from(vec![
+                Some("foo"),
+                Some("bar"),
+                Some("bar"),
+                Some("bar"),
+            ])) as _,
         ];
-        let results = lexicographical_partition_ranges(&input).unwrap();
         assert_eq!(
-            vec![(0_usize..1_usize), (1_usize..2_usize), (2_usize..3_usize),],
-            results.collect::<Vec<_>>()
+            partition(&input).unwrap().ranges(),
+            vec![(0..1), (1..3), (3..4),],
         );
     }
 
     #[test]
-    fn test_lexicographical_partition_non_unique_column_1() {
+    fn test_partition_masked_nulls() {
         let input = vec![
-            SortColumn {
-                values: Arc::new(Int64Array::from(vec![
-                    None,
-                    Some(-1),
-                    Some(-1),
-                    Some(1),
-                ])) as ArrayRef,
-                options: Some(SortOptions {
-                    descending: false,
-                    nulls_first: true,
-                }),
-            },
-            SortColumn {
-                values: Arc::new(StringArray::from(vec![
-                    Some("foo"),
-                    Some("bar"),
-                    Some("bar"),
-                    Some("bar"),
-                ])) as ArrayRef,
-                options: Some(SortOptions {
-                    descending: true,
-                    nulls_first: true,
-                }),
-            },
+            Arc::new(Int64Array::new(vec![1; 9].into(), None)) as _,
+            Arc::new(Int64Array::new(
+                vec![1, 1, 2, 2, 2, 3, 3, 3, 3].into(),

Review Comment:
   I think it would help to also have a test like this where the value in the 
null would actually be wrong / different
   
   ```suggestion
                   vec![1, 1, 2, 2, 2, 3, 0, 3, 3].into(),
   ```
   
   However I also made that change and it passed so 👍 



##########
arrow-ord/src/partition.rs:
##########
@@ -17,146 +17,74 @@
 
 //! Defines partition kernel for `ArrayRef`
 
-use crate::sort::{LexicographicalComparator, SortColumn};
+use crate::comparison::neq_dyn;
+use crate::sort::SortColumn;
+use arrow_array::Array;
+use arrow_buffer::BooleanBuffer;
 use arrow_schema::ArrowError;
-use std::cmp::Ordering;
 use std::ops::Range;
 
 /// Given a list of already sorted columns, find partition ranges that would 
partition
 /// lexicographically equal values across columns.
 ///
-/// Here LexicographicalComparator is used in conjunction with binary
-/// search so the columns *MUST* be pre-sorted already.
-///
 /// The returned vec would be of size k where k is cardinality of the sorted 
values; Consecutive
 /// values will be connected: (a, b) and (b, c), where start = 0 and end = n 
for the first and last
 /// range.
 pub fn lexicographical_partition_ranges(
     columns: &[SortColumn],
 ) -> Result<impl Iterator<Item = Range<usize>> + '_, ArrowError> {
-    LexicographicalPartitionIterator::try_new(columns)
-}
-
-struct LexicographicalPartitionIterator<'a> {
-    comparator: LexicographicalComparator<'a>,
-    num_rows: usize,
-    previous_partition_point: usize,
-    partition_point: usize,
-}
+    if columns.is_empty() {
+        return Err(ArrowError::InvalidArgumentError(
+            "Sort requires at least one column".to_string(),
+        ));
+    }
+    let num_rows = columns[0].values.len();
+    if columns.iter().any(|item| item.values.len() != num_rows) {
+        return Err(ArrowError::ComputeError(
+            "Lexical sort columns have different row counts".to_string(),
+        ));
+    };
 
-impl<'a> LexicographicalPartitionIterator<'a> {
-    fn try_new(
-        columns: &'a [SortColumn],
-    ) -> Result<LexicographicalPartitionIterator, ArrowError> {
-        if columns.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Sort requires at least one column".to_string(),
-            ));
-        }
-        let num_rows = columns[0].values.len();
-        if columns.iter().any(|item| item.values.len() != num_rows) {
-            return Err(ArrowError::ComputeError(
-                "Lexical sort columns have different row counts".to_string(),
-            ));
-        };
+    let acc = find_boundaries(&columns[0])?;
+    let acc = columns
+        .iter()
+        .skip(1)
+        .try_fold(acc, |acc, c| find_boundaries(c).map(|b| &acc | &b))?;
 
-        let comparator = LexicographicalComparator::try_new(columns)?;
-        Ok(LexicographicalPartitionIterator {
-            comparator,
-            num_rows,
-            previous_partition_point: 0,
-            partition_point: 0,
-        })
+    let mut out = vec![];
+    let mut current = 0;
+    for idx in acc.set_indices() {
+        let t = current;
+        current = idx + 1;
+        out.push(t..current)
     }
-}
-
-/// Returns the next partition point of the range `start..end` according to 
the given comparator.
-/// The return value is the index of the first element of the second partition,
-/// and is guaranteed to be between `start..=end` (inclusive).
-///
-/// The values corresponding to those indices are assumed to be partitioned 
according to the given comparator.
-///
-/// Exponential search is to remedy for the case when array size and 
cardinality are both large.
-/// In these cases the partition point would be near the beginning of the 
range and
-/// plain binary search would be doing some unnecessary iterations on each 
call.
-///
-/// see <https://en.wikipedia.org/wiki/Exponential_search>
-#[inline]
-fn exponential_search_next_partition_point(
-    start: usize,
-    end: usize,
-    comparator: &LexicographicalComparator<'_>,
-) -> usize {
-    let target = start;
-    let mut bound = 1;
-    while bound + start < end
-        && comparator.compare(bound + start, target) != Ordering::Greater
-    {
-        bound *= 2;
+    if current != num_rows {
+        out.push(current..num_rows)
     }
-
-    // invariant after while loop:
-    // (start + bound / 2) <= target < min(end, start + bound + 1)
-    // where <= and < are defined by the comparator;
-    // note here we have right = min(end, start + bound + 1) because (start + 
bound) might
-    // actually be considered and must be included.
-    partition_point(start + bound / 2, end.min(start + bound + 1), |idx| {
-        comparator.compare(idx, target) != Ordering::Greater
-    })
+    Ok(out.into_iter())
 }
 
-/// Returns the partition point of the range `start..end` according to the 
given predicate.
-/// The return value is the index of the first element of the second partition,
-/// and is guaranteed to be between `start..=end` (inclusive).
-///
-/// The algorithm is similar to a binary search.
-///
-/// The values corresponding to those indices are assumed to be partitioned 
according to the given predicate.
-///
-/// See [`slice::partition_point`]
-#[inline]
-fn partition_point<P: Fn(usize) -> bool>(start: usize, end: usize, pred: P) -> 
usize {
-    let mut left = start;
-    let mut right = end;
-    let mut size = right - left;
-    while left < right {
-        let mid = left + size / 2;
+/// Returns a mask with bits set whenever the value or nullability changes
+fn find_boundaries(col: &SortColumn) -> Result<BooleanBuffer, ArrowError> {
+    let v = &col.values;
+    let slice_len = v.len().saturating_sub(1);
+    let v1 = v.slice(0, slice_len);
+    let v2 = v.slice(1, slice_len);
 
-        let less = pred(mid);
+    let array_ne = neq_dyn(v1.as_ref(), v2.as_ref())?;
+    let values_ne = match array_ne.nulls().filter(|n| n.null_count() > 0) {
+        Some(n) => n.inner() & array_ne.values(),
+        None => array_ne.values().clone(),
+    };
 
-        if less {
-            left = mid + 1;
-        } else {
-            right = mid;
+    Ok(match v.nulls().filter(|x| x.null_count() > 0) {
+        Some(n) => {
+            let n1 = n.inner().slice(0, slice_len);
+            let n2 = n.inner().slice(1, slice_len);
+            &(&n1 ^ &n2) | &values_ne

Review Comment:
   this is quite clever



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to