alamb commented on code in PR #2593:
URL: https://github.com/apache/arrow-rs/pull/2593#discussion_r967630518


##########
arrow/src/row/mod.rs:
##########
@@ -0,0 +1,815 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A comparable row-oriented representation of a collection of [`Array`]
+
+use crate::array::{
+    as_boolean_array, as_generic_binary_array, as_largestring_array, 
as_string_array,
+    Array, ArrayRef, Decimal128Array, Decimal256Array,
+};
+use crate::compute::SortOptions;
+use crate::datatypes::*;
+use crate::row::interner::{Interned, OrderPreservingInterner};
+use crate::{downcast_dictionary_array, downcast_primitive_array};
+
+mod fixed;
+mod interner;
+mod variable;
+
+/// Converts arrays into a row-oriented format that are [normalized for 
sorting].
+///
+/// In particular, a byte-wise comparison of the rows, e.g. [`memcmp`], is 
sufficient
+/// to establish the ordering of two rows, allowing for extremely fast 
comparisons,
+/// and permitting the use of [non-comparison sorts] such as [radix sort]
+///
+/// Comparing [`Rows`] generated by different [`RowConverter`] is not 
guaranteed to
+/// yield a meaningful ordering
+///
+/// # Format
+///
+/// The encoding of the row format should not be considered stable, but is 
documented here
+/// for reference.
+///
+/// ## Unsigned Integer Encoding
+///
+/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of 
bytes corresponding
+/// to the integer's length
+///
+/// A valid integer is encoded as `1_u8`, followed by the big-endian 
representation of the
+/// integer
+///
+/// ## Signed Integer Encoding
+///
+/// Signed integers have their most significant sign bit flipped, and are then 
encoded in the
+/// same manner as an unsigned integer
+///
+/// ## Float Encoding
+///
+/// Floats are converted from IEEE 754 representation to a signed integer 
representation
+/// by flipping all bar the sign bit if they are negative.
+///
+/// They are then encoded in the same manner as a signed integer
+///
+/// ## Variable Length Bytes Encoding
+///
+/// A null is encoded as a `0_u8`
+///
+/// An empty byte array is encoded as `1_u8`
+///
+/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte 
array
+/// encoded using a block based scheme described below.
+///
+/// The byte array is broken up into 32-byte blocks, each block is written in 
turn
+/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
+/// with `0_u8` and written to the output, followed by the un-padded length in 
bytes
+/// of this final block as a `u8`
+///
+/// This is loosely inspired by [COBS] encoding, and chosen over more 
traditional
+/// [byte stuffing] as it is more amenable to vectorisation, in particular 
AVX-256.
+///
+/// ## Dictionary Encoding
+///
+/// [`RowConverter`] needs to support converting dictionary encoded arrays 
with unsorted, and
+/// potentially distinct dictionaries. One simple mechanism to avoid this 
would be to reverse
+/// the dictionary encoding, and encode the array values directly, however, 
this would lose
+/// the benefits of dictionary encoding to reduce memory and CPU consumption.
+///
+/// As such the [`RowConverter`] maintains an order-preserving dictionary 
encoding for each
+/// dictionary encoded column. As this is a variable-length encoding, new 
dictionary values
+/// can be added whilst preserving the sort order.
+///
+/// A null dictionary value is encoded as `0_u8`.
+///
+/// A non-null dictionary value is encoded as `1_u8` followed by a 
null-terminated byte array
+/// key determined by the order-preserving dictionary encoding
+///
+/// # Ordering
+///
+/// ## Float Ordering
+///
+/// Floats are totally ordered in accordance to the `totalOrder` predicate as 
defined
+/// in the IEEE 754 (2008 revision) floating point standard.
+///
+/// The ordering established by this does not always agree with the
+/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
+/// they consider negative and positive zero equal, while this does not
+///
+/// ## Null Ordering
+///
+/// The encoding described above will order nulls first, this can be inverted 
by representing
+/// nulls as `0xFF_u8` instead of `0_u8`
+///
+/// ## Reverse Column Ordering
+///
+/// The order of a given column can be reversed by negating the encoded bytes 
of non-null values
+///
+/// ## Reconstruction
+///
+/// Given a schema it would theoretically be possible to reconstruct the 
columnar data from
+/// the row format, however, this is currently not supported. It is 
recommended that the row
+/// format is instead used to obtain a sorted list of row indices, which can 
then be used
+/// with [`take`](crate::compute::take) to obtain a sorted [`Array`]
+///
+/// [non-comparison 
sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts]
+/// [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort]
+/// [normalized for 
sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf]
+/// [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html]
+/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing]
+/// [byte 
stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing]
+#[derive(Debug)]
+pub struct RowConverter {
+    options: Vec<SortOptions>,
+    dictionaries: Vec<Option<Box<OrderPreservingInterner>>>,
+}
+
+impl RowConverter {
+    /// Create a new [`RowConverter`] with the following schema and options
+    pub fn new(options: Vec<SortOptions>) -> Self {
+        let dictionaries = (0..options.len()).map(|_| None).collect();
+        Self {
+            dictionaries,
+            options,
+        }
+    }
+
+    /// Convert `cols` into [`Rows`]
+    ///
+    /// # Panics
+    ///
+    /// Panics if the schema of `cols` does not match that provided to 
[`RowConverter::new`]
+    pub fn convert(&mut self, arrays: &[ArrayRef]) -> Rows {
+        assert_eq!(arrays.len(), self.options.len(), "column count mismatch");
+
+        let dictionaries: Vec<_> = arrays
+            .iter()
+            .zip(&mut self.dictionaries)
+            .map(|(array, dictionary)| {
+                let values = downcast_dictionary_array! {
+                    array => array.values(),
+                    _ => return None
+                };
+
+                let interner = dictionary.get_or_insert_with(Default::default);
+
+                let mapping: Vec<_> = compute_dictionary_mapping(interner, 
values)
+                    .into_iter()
+                    .map(|maybe_interned| {
+                        maybe_interned.map(|interned| 
interner.normalized_key(interned))
+                    })
+                    .collect();
+
+                Some(mapping)
+            })
+            .collect();
+
+        let mut rows = new_empty_rows(arrays, &dictionaries);
+
+        for ((array, options), dictionary) in
+            arrays.iter().zip(&self.options).zip(dictionaries)
+        {
+            // We encode a column at a time to minimise dispatch overheads
+            encode_column(&mut rows, array, *options, dictionary.as_deref())
+        }
+
+        if cfg!(debug_assertions) {
+            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
+            rows.offsets
+                .windows(2)
+                .for_each(|w| assert!(w[0] < w[1], "offsets should be 
monotonic"));
+        }
+
+        rows
+    }
+}
+
+/// A row-oriented representation of arrow data, that is normalized for 
comparison
+///
+/// See [`RowConverter`]
+#[derive(Debug)]
+pub struct Rows {
+    buffer: Box<[u8]>,
+    offsets: Box<[usize]>,
+}
+
+impl Rows {
+    pub fn row(&self, row: usize) -> Row<'_> {
+        let end = self.offsets[row + 1];
+        let start = self.offsets[row];
+        Row(&self.buffer[start..end])
+    }
+
+    pub fn num_rows(&self) -> usize {
+        self.offsets.len() - 1
+    }
+}
+
+/// A comparable representation of a row, see [`Rows`]
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct Row<'a>(&'a [u8]);
+
+impl<'a> AsRef<[u8]> for Row<'a> {
+    fn as_ref(&self) -> &[u8] {
+        self.0
+    }
+}
+
+/// Computes the dictionary mapping for the given dictionary values
+fn compute_dictionary_mapping(
+    interner: &mut OrderPreservingInterner,
+    values: &ArrayRef,
+) -> Vec<Option<Interned>> {
+    use fixed::FixedLengthEncoding;
+    downcast_primitive_array! {
+        values => interner
+            .intern(values.iter().map(|x| x.map(|x| x.encode()))),
+        DataType::Binary => {
+            let iter = as_generic_binary_array::<i64>(values).iter();
+            interner.intern(iter)
+        }
+        DataType::LargeBinary => {
+            let iter = as_generic_binary_array::<i64>(values).iter();
+            interner.intern(iter)
+        }
+        DataType::Utf8 => {
+            let iter = as_string_array(values).iter().map(|x| x.map(|x| 
x.as_bytes()));
+            interner.intern(iter)
+        }
+        DataType::LargeUtf8 => {
+            let iter = as_largestring_array(values).iter().map(|x| x.map(|x| 
x.as_bytes()));
+            interner.intern(iter)
+        }
+        t => unreachable!("dictionary value {} is not supported", t)
+    }
+}
+
+/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
+fn new_empty_rows(
+    cols: &[ArrayRef],
+    dictionaries: &[Option<Vec<Option<&[u8]>>>],
+) -> Rows {
+    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
+    let mut lengths = vec![0; num_rows];
+
+    for (array, dict) in cols.iter().zip(dictionaries) {
+        downcast_primitive_array! {
+            array => lengths.iter_mut().for_each(|x| *x += 
fixed::encoded_len(array)),
+            DataType::Null => lengths.iter_mut().for_each(|x| *x += 1),
+            DataType::Boolean => lengths.iter_mut().for_each(|x| *x += 2),
+            DataType::Decimal128(_, _) => lengths.iter_mut().for_each(|x| *x 
+= 17),
+            DataType::Decimal256(_, _) => lengths.iter_mut().for_each(|x| *x 
+= 33),
+            DataType::Binary => as_generic_binary_array::<i32>(array)
+                .iter()
+                .zip(lengths.iter_mut())
+                .for_each(|(slice, length)| *length += 
variable::encoded_len(slice)),
+            DataType::LargeBinary => as_generic_binary_array::<i64>(array)
+                .iter()
+                .zip(lengths.iter_mut())
+                .for_each(|(slice, length)| *length += 
variable::encoded_len(slice)),
+            DataType::Utf8 => as_string_array(array)
+                .iter()
+                .zip(lengths.iter_mut())
+                .for_each(|(slice, length)| {
+                    *length += variable::encoded_len(slice.map(|x| 
x.as_bytes()))
+                }),
+            DataType::LargeUtf8 => as_largestring_array(array)
+                .iter()
+                .zip(lengths.iter_mut())
+                .for_each(|(slice, length)| {
+                    *length += variable::encoded_len(slice.map(|x| 
x.as_bytes()))
+                }),
+            DataType::Dictionary(_, _) => downcast_dictionary_array! {
+                array => {
+                    let dict = dict.as_ref().unwrap();
+                    for (v, length) in 
array.keys().iter().zip(lengths.iter_mut()) {
+                        match v.and_then(|v| dict[v as usize]) {
+                            Some(k) => *length += k.len() + 1,
+                            None => *length += 1,
+                        }
+                    }
+                }
+                _ => unreachable!(),
+            }
+            t => unimplemented!("not yet implemented: {}", t)
+        }
+    }
+
+    let mut offsets = Vec::with_capacity(num_rows + 1);
+    offsets.push(0);
+
+    let mut cur_offset = 0_usize;
+    for l in lengths {
+        offsets.push(cur_offset);

Review Comment:
   Thank you



##########
arrow/src/row/mod.rs:
##########
@@ -0,0 +1,815 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A comparable row-oriented representation of a collection of [`Array`]
+
+use crate::array::{
+    as_boolean_array, as_generic_binary_array, as_largestring_array, 
as_string_array,
+    Array, ArrayRef, Decimal128Array, Decimal256Array,
+};
+use crate::compute::SortOptions;
+use crate::datatypes::*;
+use crate::row::interner::{Interned, OrderPreservingInterner};
+use crate::{downcast_dictionary_array, downcast_primitive_array};
+
+mod fixed;
+mod interner;
+mod variable;
+
+/// Converts arrays into a row-oriented format that are [normalized for 
sorting].
+///
+/// In particular, a byte-wise comparison of the rows, e.g. [`memcmp`], is 
sufficient
+/// to establish the ordering of two rows, allowing for extremely fast 
comparisons,
+/// and permitting the use of [non-comparison sorts] such as [radix sort]
+///
+/// Comparing [`Rows`] generated by different [`RowConverter`] is not 
guaranteed to
+/// yield a meaningful ordering
+///
+/// # Format
+///
+/// The encoding of the row format should not be considered stable, but is 
documented here
+/// for reference.
+///
+/// ## Unsigned Integer Encoding
+///
+/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of 
bytes corresponding
+/// to the integer's length
+///
+/// A valid integer is encoded as `1_u8`, followed by the big-endian 
representation of the
+/// integer
+///
+/// ## Signed Integer Encoding
+///
+/// Signed integers have their most significant sign bit flipped, and are then 
encoded in the
+/// same manner as an unsigned integer
+///
+/// ## Float Encoding
+///
+/// Floats are converted from IEEE 754 representation to a signed integer 
representation
+/// by flipping all bar the sign bit if they are negative.
+///
+/// They are then encoded in the same manner as a signed integer
+///
+/// ## Variable Length Bytes Encoding
+///
+/// A null is encoded as a `0_u8`
+///
+/// An empty byte array is encoded as `1_u8`
+///
+/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte 
array
+/// encoded using a block based scheme described below.
+///
+/// The byte array is broken up into 32-byte blocks, each block is written in 
turn
+/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
+/// with `0_u8` and written to the output, followed by the un-padded length in 
bytes
+/// of this final block as a `u8`
+///
+/// This is loosely inspired by [COBS] encoding, and chosen over more 
traditional
+/// [byte stuffing] as it is more amenable to vectorisation, in particular 
AVX-256.
+///
+/// ## Dictionary Encoding
+///
+/// [`RowConverter`] needs to support converting dictionary encoded arrays 
with unsorted, and
+/// potentially distinct dictionaries. One simple mechanism to avoid this 
would be to reverse
+/// the dictionary encoding, and encode the array values directly, however, 
this would lose
+/// the benefits of dictionary encoding to reduce memory and CPU consumption.
+///
+/// As such the [`RowConverter`] maintains an order-preserving dictionary 
encoding for each
+/// dictionary encoded column. As this is a variable-length encoding, new 
dictionary values
+/// can be added whilst preserving the sort order.
+///
+/// A null dictionary value is encoded as `0_u8`.
+///
+/// A non-null dictionary value is encoded as `1_u8` followed by a 
null-terminated byte array
+/// key determined by the order-preserving dictionary encoding
+///
+/// # Ordering
+///
+/// ## Float Ordering
+///
+/// Floats are totally ordered in accordance to the `totalOrder` predicate as 
defined
+/// in the IEEE 754 (2008 revision) floating point standard.
+///
+/// The ordering established by this does not always agree with the
+/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
+/// they consider negative and positive zero equal, while this does not
+///
+/// ## Null Ordering
+///
+/// The encoding described above will order nulls first, this can be inverted 
by representing
+/// nulls as `0xFF_u8` instead of `0_u8`
+///
+/// ## Reverse Column Ordering
+///
+/// The order of a given column can be reversed by negating the encoded bytes 
of non-null values
+///
+/// ## Reconstruction
+///
+/// Given a schema it would theoretically be possible to reconstruct the 
columnar data from
+/// the row format, however, this is currently not supported. It is 
recommended that the row
+/// format is instead used to obtain a sorted list of row indices, which can 
then be used
+/// with [`take`](crate::compute::take) to obtain a sorted [`Array`]
+///
+/// [non-comparison 
sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts]
+/// [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort]
+/// [normalized for 
sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf]
+/// [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html]
+/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing]
+/// [byte 
stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing]
+#[derive(Debug)]
+pub struct RowConverter {
+    options: Vec<SortOptions>,
+    dictionaries: Vec<Option<Box<OrderPreservingInterner>>>,
+}
+
+impl RowConverter {
+    /// Create a new [`RowConverter`] with the following schema and options
+    pub fn new(options: Vec<SortOptions>) -> Self {
+        let dictionaries = (0..options.len()).map(|_| None).collect();
+        Self {
+            dictionaries,
+            options,
+        }
+    }
+
+    /// Convert `cols` into [`Rows`]
+    ///
+    /// # Panics
+    ///
+    /// Panics if the schema of `cols` does not match that provided to 
[`RowConverter::new`]
+    pub fn convert(&mut self, arrays: &[ArrayRef]) -> Rows {
+        assert_eq!(arrays.len(), self.options.len(), "column count mismatch");

Review Comment:
   This logic makes sense to me



##########
arrow/src/row/variable.rs:
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::compute::SortOptions;
+use crate::row::Rows;
+use crate::util::bit_util::ceil;
+
+/// The block size of the variable length encoding
+pub const BLOCK_SIZE: usize = 32;
+
+/// Returns the length of the encoded representation of a byte array
+pub fn encoded_len(a: Option<&[u8]>) -> usize {
+    match a {
+        Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1),
+        None => 1,
+    }
+}
+
+/// Variable length values are encoded as
+///
+/// - leading `0` bit if null otherwise `1`
+/// - 31 bits big endian length
+/// - length bytes of value data
+pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
+    out: &mut Rows,
+    i: I,
+    opts: SortOptions,
+) {
+    for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
+        match maybe_val {
+            Some(val) if val.is_empty() => {
+                out.buffer[*offset] = match opts.descending {
+                    true => !1,
+                    false => 1,
+                };
+                *offset += 1;
+            }
+            Some(val) => {
+                let block_count = ceil(val.len(), BLOCK_SIZE);
+                let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1);
+                let to_write = &mut out.buffer[*offset..end_offset];
+
+                // Set validity
+                to_write[0] = 2;
+
+                let chunks = val.chunks_exact(BLOCK_SIZE);
+                let remainder = chunks.remainder();
+                for (input, output) in chunks
+                    .clone()
+                    .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1))
+                {
+                    let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap();
+                    let out_block: &mut [u8; BLOCK_SIZE] =
+                        (&mut output[..BLOCK_SIZE]).try_into().unwrap();
+
+                    *out_block = *input;
+                    output[BLOCK_SIZE] = u8::MAX;
+                }
+
+                if !remainder.is_empty() {
+                    let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 
1);
+                    to_write[start_offset..start_offset + remainder.len()]
+                        .copy_from_slice(remainder);
+                    *to_write.last_mut().unwrap() = remainder.len() as u8;
+                } else {
+                    *to_write.last_mut().unwrap() = BLOCK_SIZE as u8;

Review Comment:
   Ah, the idea here is that this is the length just written 👍 



##########
arrow/src/row/interner.rs:
##########
@@ -0,0 +1,451 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use hashbrown::hash_map::RawEntryMut;
+use hashbrown::HashMap;
+use std::cmp::Ordering;
+use std::num::NonZeroU32;
+use std::ops::Index;
+
+/// An interned value
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub struct Interned(NonZeroU32); // We use NonZeroU32 so that 
`Option<Interned>` is 32 bits
+
+/// A byte array interner that generates normalized keys that are sorted with 
respect
+/// to the interned values, e.g. `inter(a) < intern(b) => a < b`
+#[derive(Debug, Default)]
+pub struct OrderPreservingInterner {
+    /// Provides a lookup from [`Interned`] to the normalized key
+    keys: InternBuffer,
+    /// Provides a lookup from [`Interned`] to the normalized value
+    values: InternBuffer,
+    /// Key allocation data structure
+    bucket: Box<Bucket>,
+
+    // A hash table used to perform faster re-keying, and detect duplicates
+    hasher: ahash::RandomState,
+    lookup: HashMap<Interned, (), ()>,
+}
+
+impl OrderPreservingInterner {
+    /// Interns an iterator of values returning a list of [`Interned`] which 
can be
+    /// used with [`Self::normalized_key`] to retrieve the normalized keys 
with a
+    /// lifetime not tied to the mutable borrow passed to this method
+    pub fn intern<I, V>(&mut self, input: I) -> Vec<Option<Interned>>
+    where
+        I: IntoIterator<Item = Option<V>>,
+        V: AsRef<[u8]>,
+    {
+        let iter = input.into_iter();
+        let capacity = iter.size_hint().0;
+        let mut out = Vec::with_capacity(capacity);
+
+        // (index in output, hash value, value)
+        let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity);
+        let mut to_intern_len = 0;
+
+        for (idx, item) in iter.enumerate() {
+            let value: V = match item {
+                Some(value) => value,
+                None => {
+                    out.push(None);
+                    continue;
+                }
+            };
+
+            let v = value.as_ref();
+            let hash = self.hasher.hash_one(v);
+            let entry = self
+                .lookup
+                .raw_entry_mut()
+                .from_hash(hash, |a| &self.values[*a] == v);
+
+            match entry {
+                RawEntryMut::Occupied(o) => out.push(Some(*o.key())),
+                RawEntryMut::Vacant(_) => {
+                    // Push placeholder
+                    out.push(None);
+                    to_intern_len += v.len();
+                    to_intern.push((idx, hash, value));
+                }
+            };
+        }
+
+        to_intern.sort_unstable_by(|(_, _, a), (_, _, b)| 
a.as_ref().cmp(b.as_ref()));
+
+        self.keys.offsets.reserve(to_intern.len());
+        self.keys.values.reserve(to_intern.len()); // Approximation
+        self.values.offsets.reserve(to_intern.len());
+        self.values.values.reserve(to_intern_len);
+
+        for (idx, hash, value) in to_intern {
+            let val = value.as_ref();
+
+            let entry = self
+                .lookup
+                .raw_entry_mut()
+                .from_hash(hash, |a| &self.values[*a] == val);
+
+            match entry {
+                RawEntryMut::Occupied(o) => {
+                    out[idx] = Some(*o.key());
+                }
+                RawEntryMut::Vacant(v) => {
+                    let val = value.as_ref();
+                    self.bucket
+                        .insert(&mut self.values, val, &mut self.keys.values);
+                    self.keys.values.push(0);
+                    let interned = self.keys.append();
+
+                    let hasher = &mut self.hasher;
+                    let values = &self.values;
+                    v.insert_with_hasher(hash, interned, (), |key| {
+                        hasher.hash_one(&values[*key])
+                    });
+                    out[idx] = Some(interned);
+                }
+            }
+        }
+
+        out
+    }
+
+    /// Returns a null-terminated byte array that can be compared against 
other normalized_key
+    /// returned by this instance, to establish ordering of the interned values
+    pub fn normalized_key(&self, key: Interned) -> &[u8] {
+        &self.keys[key]
+    }
+}
+
+/// A buffer of `[u8]` indexed by `[Interned]`
+#[derive(Debug)]
+struct InternBuffer {
+    /// Raw values
+    values: Vec<u8>,
+    /// The ith value is `&values[offsets[i]..offsets[i+1]]`
+    offsets: Vec<usize>,
+}
+
+impl Default for InternBuffer {
+    fn default() -> Self {
+        Self {
+            values: Default::default(),
+            offsets: vec![0],
+        }
+    }
+}
+
+impl InternBuffer {
+    /// Insert `data` returning the corresponding [`Interned`]
+    fn insert(&mut self, data: &[u8]) -> Interned {
+        self.values.extend_from_slice(data);
+        self.append()
+    }
+
+    /// Appends the next value based on data written to `self.values`
+    /// returning the corresponding [`Interned`]
+    fn append(&mut self) -> Interned {
+        let idx: u32 = self.offsets.len().try_into().unwrap();
+        let key = Interned(NonZeroU32::new(idx).unwrap());
+        self.offsets.push(self.values.len());
+        key
+    }
+}
+
+impl Index<Interned> for InternBuffer {
+    type Output = [u8];
+
+    fn index(&self, key: Interned) -> &Self::Output {
+        let index = key.0.get() as usize;
+        let end = self.offsets[index];
+        let start = self.offsets[index - 1];
+        // SAFETY:
+        // self.values is never reduced in size and values appended
+        // to self.offsets are always less than self.values at the time
+        unsafe { self.values.get_unchecked(start..end) }
+    }
+}
+
+/// A slot corresponds to a single byte-value in the generated normalized key
+///
+/// It may contain a value, if not the first slot, and may contain a child 
[`Bucket`] representing
+/// the next byte in the generated normalized key
+#[derive(Debug, Default, Clone)]
+struct Slot {
+    value: Option<Interned>,
+    /// Child values less than `self.value` if any
+    child: Option<Box<Bucket>>,
+}
+
+/// Bucket is the root of the data-structure used to allocate normalized keys
+///
+/// In particular it needs to generate keys that
+///
+/// * Contain no `0` bytes other than the null terminator
+/// * Compare lexicographically in the same manner as the encoded `data`
+///
+/// The data structure consists of 255 slots, each of which can store a value.
+/// Additionally each slot may contain a child bucket, containing values 
smaller
+/// than the value within the slot
+///
+/// # Allocation Strategy
+///
+/// To find the insertion point within a Bucket we perform a binary search of 
the slots, but
+/// capping the search range at 4. Visualizing this as a search tree, the root 
would have 64
+/// children, with subsequent non-leaf nodes each containing two children.
+///
+/// The insertion point is the first empty slot we encounter, otherwise it is 
the first slot
+/// that contains a value greater than the value being inserted
+///
+/// For example, initially all slots are empty
+///
+/// ```ignore
+/// 0:
+/// 1:
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `1000`
+///
+/// ```ignore
+/// 0:
+/// 1:
+/// 2:
+/// 3: 1000 <- 1. slot is empty, insert here
+/// 4:
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `500`
+///
+/// ```ignore
+/// 0:
+/// 1: 500 <- 2. slot is empty, insert here
+/// 2:
+/// 3: 1000 <- 1. compare against slot value
+/// 4.
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `600`
+///
+/// ```ignore
+/// 0:
+/// 1: 500 <- 2. compare against slot value
+/// 2: 600 <- 3. slot is empty, insert here
+/// 3: 1000 <- 1. compare against slot value
+/// 4.
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `400`
+///
+/// ```ignore
+/// 0: 400 <- 3. slot is empty, insert here
+/// 1: 500 <- 2. compare against slot value
+/// 2: 600
+/// 3: 1000 <- 1. compare against slot value
+/// 4.
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `700`
+///
+/// ```ignore
+/// 0: 400
+/// 1: 500 <- 2. compare against slot value
+/// 2: 600 <- 3. slot is occupied and end of search
+/// 3: 1000 <- 1. compare against slot value
+/// 4.
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// In this case we reach the end of our search and need to insert a value 
between
+/// slots 2 and 3. To do this we create a new bucket under slot 3, and repeat
+/// the process for that bucket.
+///
+/// The final key will consists of the slot indexes visited incremented by 1,
+/// with the final value incremented by 2, followed by a null terminator.
+///
+/// So in the above example we would have
+///
+/// ```ignore
+/// 400: &[2, 0]
+/// 500: &[3, 0]
+/// 600: &[4, 0]
+/// 700: &[4, 5, 0]
+/// 1000: &[5, 0]
+/// ```
+///
+#[derive(Debug, Clone)]
+struct Bucket {
+    slots: Box<[Slot]>,
+}
+
+impl Default for Bucket {
+    fn default() -> Self {
+        let slots = (0..255).map(|_| 
Slot::default()).collect::<Vec<_>>().into();
+        Self { slots }
+    }
+}
+
+impl Bucket {
+    /// Perform a skewed binary search to find the first slot that is empty or 
less
+    ///
+    /// Returns `Ok(idx)` if an exact match is found, otherwise returns 
`Err(idx)`
+    /// containing the slot index to insert at
+    fn insert_pos(&self, values_buf: &InternBuffer, data: &[u8]) -> 
Result<usize, usize> {
+        let mut size = self.slots.len() - 1;
+        let mut left = 0;
+        let mut right = size;
+        while left < right {
+            // Skew binary search to leave gaps of at most 3 elements
+            let mid = left + (size / 2).min(3);
+
+            let slot = &self.slots[mid];
+            let val = match slot.value {
+                Some(val) => val,
+                None => return Err(mid),
+            };
+
+            let cmp = values_buf[val].cmp(data);
+            if cmp == Ordering::Less {
+                left = mid + 1;
+            } else if cmp == Ordering::Greater {
+                right = mid;
+            } else {
+                return Ok(mid);
+            }
+
+            size = right - left;
+        }
+        Err(left)
+    }
+
+    /// Insert `data` into this bucket or one of its children, appending the
+    /// normalized key to `out` as it is constructed
+    ///
+    /// # Panics
+    ///
+    /// Panics if the value already exists
+    fn insert(&mut self, values_buf: &mut InternBuffer, data: &[u8], out: &mut 
Vec<u8>) {
+        match self.insert_pos(values_buf, data) {
+            Ok(_) => unreachable!("value already exists"),
+            Err(idx) => {
+                let slot = &mut self.slots[idx];
+                // Cannot insert a value into slot 254 as would overflow byte, 
but also
+                // would prevent inserting any larger values, as the child 
bucket can
+                // only contain values less than the slot
+                if idx != 254 && slot.value.is_none() {
+                    out.push(idx as u8 + 2);
+                    slot.value = Some(values_buf.insert(data))
+                } else {
+                    out.push(idx as u8 + 1);
+                    slot.child
+                        .get_or_insert_with(Default::default)
+                        .insert(values_buf, data, out);
+                }
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use rand::prelude::*;
+
+    // Clippy isn't smart enough to understand dropping mutability
+    #[allow(clippy::needless_collect)]
+    fn test_intern_values(values: &[u64]) {
+        let mut interner = OrderPreservingInterner::default();
+
+        // Intern a single value at a time to check ordering
+        let interned: Vec<_> = values
+            .iter()
+            .flat_map(|v| interner.intern([Some(&v.to_be_bytes())]))
+            .map(Option::unwrap)
+            .collect();
+
+        let interned: Vec<_> = interned
+            .into_iter()
+            .map(|x| interner.normalized_key(x))
+            .collect();
+
+        for (i, a) in interned.iter().enumerate() {
+            for (j, b) in interned.iter().enumerate() {
+                let interned_cmp = a.cmp(b);
+                let values_cmp = values[i].cmp(&values[j]);
+                assert_eq!(
+                    interned_cmp, values_cmp,
+                    "({:?} vs {:?}) vs ({} vs {})",
+                    a, b, values[i], values[j]
+                )
+            }
+        }
+    }
+
+    #[test]
+    #[cfg_attr(miri, ignore)]
+    fn test_interner() {
+        test_intern_values(&[8, 6, 5, 7]);
+
+        let mut values: Vec<_> = (0_u64..2000).collect();

Review Comment:
   I wonder if there is any value (as a separate integration test perhaps) for 
inserting enough values to result in at least two bucket levels (so a bucket 
has an entry that is a child that has an entry that has a child). Or maybe 2000 
is enough to do this already
   
   



##########
arrow/src/row/variable.rs:
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::compute::SortOptions;
+use crate::row::Rows;
+use crate::util::bit_util::ceil;
+
+/// The block size of the variable length encoding
+pub const BLOCK_SIZE: usize = 32;
+
+/// Returns the length of the encoded representation of a byte array
+pub fn encoded_len(a: Option<&[u8]>) -> usize {
+    match a {
+        Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1),
+        None => 1,
+    }
+}
+
+/// Variable length values are encoded as
+///
+/// - leading `0` bit if null otherwise `1`
+/// - 31 bits big endian length
+/// - length bytes of value data
+pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
+    out: &mut Rows,
+    i: I,
+    opts: SortOptions,
+) {
+    for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
+        match maybe_val {
+            Some(val) if val.is_empty() => {
+                out.buffer[*offset] = match opts.descending {
+                    true => !1,
+                    false => 1,
+                };
+                *offset += 1;
+            }
+            Some(val) => {
+                let block_count = ceil(val.len(), BLOCK_SIZE);
+                let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1);
+                let to_write = &mut out.buffer[*offset..end_offset];
+
+                // Set validity
+                to_write[0] = 2;

Review Comment:
   It is still 2 -- I read the comments as saying the validity byte is a `1`. 
Sorry for my ignorance but I don't understand this particular constant -- can 
you elaborate?



##########
arrow/src/row/interner.rs:
##########
@@ -0,0 +1,451 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use hashbrown::hash_map::RawEntryMut;
+use hashbrown::HashMap;
+use std::cmp::Ordering;
+use std::num::NonZeroU32;
+use std::ops::Index;
+
+/// An interned value
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub struct Interned(NonZeroU32); // We use NonZeroU32 so that 
`Option<Interned>` is 32 bits
+
+/// A byte array interner that generates normalized keys that are sorted with 
respect
+/// to the interned values, e.g. `inter(a) < intern(b) => a < b`
+#[derive(Debug, Default)]
+pub struct OrderPreservingInterner {
+    /// Provides a lookup from [`Interned`] to the normalized key
+    keys: InternBuffer,
+    /// Provides a lookup from [`Interned`] to the normalized value
+    values: InternBuffer,
+    /// Key allocation data structure
+    bucket: Box<Bucket>,
+
+    // A hash table used to perform faster re-keying, and detect duplicates
+    hasher: ahash::RandomState,
+    lookup: HashMap<Interned, (), ()>,
+}
+
+impl OrderPreservingInterner {
+    /// Interns an iterator of values returning a list of [`Interned`] which 
can be
+    /// used with [`Self::normalized_key`] to retrieve the normalized keys 
with a
+    /// lifetime not tied to the mutable borrow passed to this method
+    pub fn intern<I, V>(&mut self, input: I) -> Vec<Option<Interned>>
+    where
+        I: IntoIterator<Item = Option<V>>,
+        V: AsRef<[u8]>,
+    {
+        let iter = input.into_iter();
+        let capacity = iter.size_hint().0;
+        let mut out = Vec::with_capacity(capacity);
+
+        // (index in output, hash value, value)
+        let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity);
+        let mut to_intern_len = 0;
+
+        for (idx, item) in iter.enumerate() {
+            let value: V = match item {
+                Some(value) => value,
+                None => {
+                    out.push(None);
+                    continue;
+                }
+            };
+
+            let v = value.as_ref();
+            let hash = self.hasher.hash_one(v);
+            let entry = self
+                .lookup
+                .raw_entry_mut()
+                .from_hash(hash, |a| &self.values[*a] == v);
+
+            match entry {
+                RawEntryMut::Occupied(o) => out.push(Some(*o.key())),
+                RawEntryMut::Vacant(_) => {
+                    // Push placeholder
+                    out.push(None);
+                    to_intern_len += v.len();
+                    to_intern.push((idx, hash, value));
+                }
+            };
+        }
+
+        to_intern.sort_unstable_by(|(_, _, a), (_, _, b)| 
a.as_ref().cmp(b.as_ref()));
+
+        self.keys.offsets.reserve(to_intern.len());
+        self.keys.values.reserve(to_intern.len()); // Approximation
+        self.values.offsets.reserve(to_intern.len());
+        self.values.values.reserve(to_intern_len);
+
+        for (idx, hash, value) in to_intern {
+            let val = value.as_ref();
+
+            let entry = self
+                .lookup
+                .raw_entry_mut()
+                .from_hash(hash, |a| &self.values[*a] == val);
+
+            match entry {
+                RawEntryMut::Occupied(o) => {
+                    out[idx] = Some(*o.key());
+                }
+                RawEntryMut::Vacant(v) => {
+                    let val = value.as_ref();
+                    self.bucket
+                        .insert(&mut self.values, val, &mut self.keys.values);
+                    self.keys.values.push(0);
+                    let interned = self.keys.append();
+
+                    let hasher = &mut self.hasher;
+                    let values = &self.values;
+                    v.insert_with_hasher(hash, interned, (), |key| {
+                        hasher.hash_one(&values[*key])
+                    });
+                    out[idx] = Some(interned);
+                }
+            }
+        }
+
+        out
+    }
+
+    /// Returns a null-terminated byte array that can be compared against 
other normalized_key
+    /// returned by this instance, to establish ordering of the interned values
+    pub fn normalized_key(&self, key: Interned) -> &[u8] {
+        &self.keys[key]
+    }
+}
+
+/// A buffer of `[u8]` indexed by `[Interned]`
+#[derive(Debug)]
+struct InternBuffer {
+    /// Raw values
+    values: Vec<u8>,
+    /// The ith value is `&values[offsets[i]..offsets[i+1]]`
+    offsets: Vec<usize>,
+}
+
+impl Default for InternBuffer {
+    fn default() -> Self {
+        Self {
+            values: Default::default(),
+            offsets: vec![0],
+        }
+    }
+}
+
+impl InternBuffer {
+    /// Insert `data` returning the corresponding [`Interned`]
+    fn insert(&mut self, data: &[u8]) -> Interned {
+        self.values.extend_from_slice(data);
+        self.append()
+    }
+
+    /// Appends the next value based on data written to `self.values`
+    /// returning the corresponding [`Interned`]
+    fn append(&mut self) -> Interned {
+        let idx: u32 = self.offsets.len().try_into().unwrap();
+        let key = Interned(NonZeroU32::new(idx).unwrap());
+        self.offsets.push(self.values.len());
+        key
+    }
+}
+
+impl Index<Interned> for InternBuffer {
+    type Output = [u8];
+
+    fn index(&self, key: Interned) -> &Self::Output {
+        let index = key.0.get() as usize;
+        let end = self.offsets[index];
+        let start = self.offsets[index - 1];
+        // SAFETY:
+        // self.values is never reduced in size and values appended
+        // to self.offsets are always less than self.values at the time
+        unsafe { self.values.get_unchecked(start..end) }
+    }
+}
+
+/// A slot corresponds to a single byte-value in the generated normalized key
+///
+/// It may contain a value, if not the first slot, and may contain a child 
[`Bucket`] representing
+/// the next byte in the generated normalized key
+#[derive(Debug, Default, Clone)]
+struct Slot {
+    value: Option<Interned>,
+    /// Child values less than `self.value` if any
+    child: Option<Box<Bucket>>,
+}
+
+/// Bucket is the root of the data-structure used to allocate normalized keys
+///
+/// In particular it needs to generate keys that
+///
+/// * Contain no `0` bytes other than the null terminator
+/// * Compare lexicographically in the same manner as the encoded `data`
+///
+/// The data structure consists of 255 slots, each of which can store a value.
+/// Additionally each slot may contain a child bucket, containing values 
smaller
+/// than the value within the slot
+///
+/// # Allocation Strategy
+///
+/// To find the insertion point within a Bucket we perform a binary search of 
the slots, but
+/// capping the search range at 4. Visualizing this as a search tree, the root 
would have 64
+/// children, with subsequent non-leaf nodes each containing two children.
+///
+/// The insertion point is the first empty slot we encounter, otherwise it is 
the first slot
+/// that contains a value greater than the value being inserted
+///
+/// For example, initially all slots are empty
+///
+/// ```ignore
+/// 0:
+/// 1:
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `1000`
+///
+/// ```ignore
+/// 0:
+/// 1:
+/// 2:
+/// 3: 1000 <- 1. slot is empty, insert here
+/// 4:
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `500`
+///
+/// ```ignore
+/// 0:
+/// 1: 500 <- 2. slot is empty, insert here
+/// 2:
+/// 3: 1000 <- 1. compare against slot value
+/// 4.
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `600`
+///
+/// ```ignore
+/// 0:
+/// 1: 500 <- 2. compare against slot value
+/// 2: 600 <- 3. slot is empty, insert here
+/// 3: 1000 <- 1. compare against slot value
+/// 4.
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `400`
+///
+/// ```ignore
+/// 0: 400 <- 3. slot is empty, insert here
+/// 1: 500 <- 2. compare against slot value
+/// 2: 600
+/// 3: 1000 <- 1. compare against slot value
+/// 4.
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// Insert `700`
+///
+/// ```ignore
+/// 0: 400
+/// 1: 500 <- 2. compare against slot value
+/// 2: 600 <- 3. slot is occupied and end of search
+/// 3: 1000 <- 1. compare against slot value
+/// 4.
+/// .
+/// .
+/// 254:
+/// ```
+///
+/// In this case we reach the end of our search and need to insert a value 
between
+/// slots 2 and 3. To do this we create a new bucket under slot 3, and repeat
+/// the process for that bucket.
+///
+/// The final key will consists of the slot indexes visited incremented by 1,
+/// with the final value incremented by 2, followed by a null terminator.

Review Comment:
   Ah, the null terminator idea is the key thing I was missing in my initial 
reading. This is a very clever idea 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to