alamb commented on code in PR #2593: URL: https://github.com/apache/arrow-rs/pull/2593#discussion_r967630518
########## arrow/src/row/mod.rs: ########## @@ -0,0 +1,815 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A comparable row-oriented representation of a collection of [`Array`] + +use crate::array::{ + as_boolean_array, as_generic_binary_array, as_largestring_array, as_string_array, + Array, ArrayRef, Decimal128Array, Decimal256Array, +}; +use crate::compute::SortOptions; +use crate::datatypes::*; +use crate::row::interner::{Interned, OrderPreservingInterner}; +use crate::{downcast_dictionary_array, downcast_primitive_array}; + +mod fixed; +mod interner; +mod variable; + +/// Converts arrays into a row-oriented format that are [normalized for sorting]. +/// +/// In particular, a byte-wise comparison of the rows, e.g. [`memcmp`], is sufficient +/// to establish the ordering of two rows, allowing for extremely fast comparisons, +/// and permitting the use of [non-comparison sorts] such as [radix sort] +/// +/// Comparing [`Rows`] generated by different [`RowConverter`] is not guaranteed to +/// yield a meaningful ordering +/// +/// # Format +/// +/// The encoding of the row format should not be considered stable, but is documented here +/// for reference. +/// +/// ## Unsigned Integer Encoding +/// +/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding +/// to the integer's length +/// +/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the +/// integer +/// +/// ## Signed Integer Encoding +/// +/// Signed integers have their most significant sign bit flipped, and are then encoded in the +/// same manner as an unsigned integer +/// +/// ## Float Encoding +/// +/// Floats are converted from IEEE 754 representation to a signed integer representation +/// by flipping all bar the sign bit if they are negative. +/// +/// They are then encoded in the same manner as a signed integer +/// +/// ## Variable Length Bytes Encoding +/// +/// A null is encoded as a `0_u8` +/// +/// An empty byte array is encoded as `1_u8` +/// +/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array +/// encoded using a block based scheme described below. +/// +/// The byte array is broken up into 32-byte blocks, each block is written in turn +/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes +/// with `0_u8` and written to the output, followed by the un-padded length in bytes +/// of this final block as a `u8` +/// +/// This is loosely inspired by [COBS] encoding, and chosen over more traditional +/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. +/// +/// ## Dictionary Encoding +/// +/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and +/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse +/// the dictionary encoding, and encode the array values directly, however, this would lose +/// the benefits of dictionary encoding to reduce memory and CPU consumption. +/// +/// As such the [`RowConverter`] maintains an order-preserving dictionary encoding for each +/// dictionary encoded column. As this is a variable-length encoding, new dictionary values +/// can be added whilst preserving the sort order. +/// +/// A null dictionary value is encoded as `0_u8`. +/// +/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array +/// key determined by the order-preserving dictionary encoding +/// +/// # Ordering +/// +/// ## Float Ordering +/// +/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined +/// in the IEEE 754 (2008 revision) floating point standard. +/// +/// The ordering established by this does not always agree with the +/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, +/// they consider negative and positive zero equal, while this does not +/// +/// ## Null Ordering +/// +/// The encoding described above will order nulls first, this can be inverted by representing +/// nulls as `0xFF_u8` instead of `0_u8` +/// +/// ## Reverse Column Ordering +/// +/// The order of a given column can be reversed by negating the encoded bytes of non-null values +/// +/// ## Reconstruction +/// +/// Given a schema it would theoretically be possible to reconstruct the columnar data from +/// the row format, however, this is currently not supported. It is recommended that the row +/// format is instead used to obtain a sorted list of row indices, which can then be used +/// with [`take`](crate::compute::take) to obtain a sorted [`Array`] +/// +/// [non-comparison sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts] +/// [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort] +/// [normalized for sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf] +/// [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html] +/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing] +/// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing] +#[derive(Debug)] +pub struct RowConverter { + options: Vec<SortOptions>, + dictionaries: Vec<Option<Box<OrderPreservingInterner>>>, +} + +impl RowConverter { + /// Create a new [`RowConverter`] with the following schema and options + pub fn new(options: Vec<SortOptions>) -> Self { + let dictionaries = (0..options.len()).map(|_| None).collect(); + Self { + dictionaries, + options, + } + } + + /// Convert `cols` into [`Rows`] + /// + /// # Panics + /// + /// Panics if the schema of `cols` does not match that provided to [`RowConverter::new`] + pub fn convert(&mut self, arrays: &[ArrayRef]) -> Rows { + assert_eq!(arrays.len(), self.options.len(), "column count mismatch"); + + let dictionaries: Vec<_> = arrays + .iter() + .zip(&mut self.dictionaries) + .map(|(array, dictionary)| { + let values = downcast_dictionary_array! { + array => array.values(), + _ => return None + }; + + let interner = dictionary.get_or_insert_with(Default::default); + + let mapping: Vec<_> = compute_dictionary_mapping(interner, values) + .into_iter() + .map(|maybe_interned| { + maybe_interned.map(|interned| interner.normalized_key(interned)) + }) + .collect(); + + Some(mapping) + }) + .collect(); + + let mut rows = new_empty_rows(arrays, &dictionaries); + + for ((array, options), dictionary) in + arrays.iter().zip(&self.options).zip(dictionaries) + { + // We encode a column at a time to minimise dispatch overheads + encode_column(&mut rows, array, *options, dictionary.as_deref()) + } + + if cfg!(debug_assertions) { + assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len()); + rows.offsets + .windows(2) + .for_each(|w| assert!(w[0] < w[1], "offsets should be monotonic")); + } + + rows + } +} + +/// A row-oriented representation of arrow data, that is normalized for comparison +/// +/// See [`RowConverter`] +#[derive(Debug)] +pub struct Rows { + buffer: Box<[u8]>, + offsets: Box<[usize]>, +} + +impl Rows { + pub fn row(&self, row: usize) -> Row<'_> { + let end = self.offsets[row + 1]; + let start = self.offsets[row]; + Row(&self.buffer[start..end]) + } + + pub fn num_rows(&self) -> usize { + self.offsets.len() - 1 + } +} + +/// A comparable representation of a row, see [`Rows`] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Row<'a>(&'a [u8]); + +impl<'a> AsRef<[u8]> for Row<'a> { + fn as_ref(&self) -> &[u8] { + self.0 + } +} + +/// Computes the dictionary mapping for the given dictionary values +fn compute_dictionary_mapping( + interner: &mut OrderPreservingInterner, + values: &ArrayRef, +) -> Vec<Option<Interned>> { + use fixed::FixedLengthEncoding; + downcast_primitive_array! { + values => interner + .intern(values.iter().map(|x| x.map(|x| x.encode()))), + DataType::Binary => { + let iter = as_generic_binary_array::<i64>(values).iter(); + interner.intern(iter) + } + DataType::LargeBinary => { + let iter = as_generic_binary_array::<i64>(values).iter(); + interner.intern(iter) + } + DataType::Utf8 => { + let iter = as_string_array(values).iter().map(|x| x.map(|x| x.as_bytes())); + interner.intern(iter) + } + DataType::LargeUtf8 => { + let iter = as_largestring_array(values).iter().map(|x| x.map(|x| x.as_bytes())); + interner.intern(iter) + } + t => unreachable!("dictionary value {} is not supported", t) + } +} + +/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] +fn new_empty_rows( + cols: &[ArrayRef], + dictionaries: &[Option<Vec<Option<&[u8]>>>], +) -> Rows { + let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); + let mut lengths = vec![0; num_rows]; + + for (array, dict) in cols.iter().zip(dictionaries) { + downcast_primitive_array! { + array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), + DataType::Null => lengths.iter_mut().for_each(|x| *x += 1), + DataType::Boolean => lengths.iter_mut().for_each(|x| *x += 2), + DataType::Decimal128(_, _) => lengths.iter_mut().for_each(|x| *x += 17), + DataType::Decimal256(_, _) => lengths.iter_mut().for_each(|x| *x += 33), + DataType::Binary => as_generic_binary_array::<i32>(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::LargeBinary => as_generic_binary_array::<i64>(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::Utf8 => as_string_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::LargeUtf8 => as_largestring_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => { + let dict = dict.as_ref().unwrap(); + for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { + match v.and_then(|v| dict[v as usize]) { + Some(k) => *length += k.len() + 1, + None => *length += 1, + } + } + } + _ => unreachable!(), + } + t => unimplemented!("not yet implemented: {}", t) + } + } + + let mut offsets = Vec::with_capacity(num_rows + 1); + offsets.push(0); + + let mut cur_offset = 0_usize; + for l in lengths { + offsets.push(cur_offset); Review Comment: Thank you ########## arrow/src/row/mod.rs: ########## @@ -0,0 +1,815 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A comparable row-oriented representation of a collection of [`Array`] + +use crate::array::{ + as_boolean_array, as_generic_binary_array, as_largestring_array, as_string_array, + Array, ArrayRef, Decimal128Array, Decimal256Array, +}; +use crate::compute::SortOptions; +use crate::datatypes::*; +use crate::row::interner::{Interned, OrderPreservingInterner}; +use crate::{downcast_dictionary_array, downcast_primitive_array}; + +mod fixed; +mod interner; +mod variable; + +/// Converts arrays into a row-oriented format that are [normalized for sorting]. +/// +/// In particular, a byte-wise comparison of the rows, e.g. [`memcmp`], is sufficient +/// to establish the ordering of two rows, allowing for extremely fast comparisons, +/// and permitting the use of [non-comparison sorts] such as [radix sort] +/// +/// Comparing [`Rows`] generated by different [`RowConverter`] is not guaranteed to +/// yield a meaningful ordering +/// +/// # Format +/// +/// The encoding of the row format should not be considered stable, but is documented here +/// for reference. +/// +/// ## Unsigned Integer Encoding +/// +/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding +/// to the integer's length +/// +/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the +/// integer +/// +/// ## Signed Integer Encoding +/// +/// Signed integers have their most significant sign bit flipped, and are then encoded in the +/// same manner as an unsigned integer +/// +/// ## Float Encoding +/// +/// Floats are converted from IEEE 754 representation to a signed integer representation +/// by flipping all bar the sign bit if they are negative. +/// +/// They are then encoded in the same manner as a signed integer +/// +/// ## Variable Length Bytes Encoding +/// +/// A null is encoded as a `0_u8` +/// +/// An empty byte array is encoded as `1_u8` +/// +/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array +/// encoded using a block based scheme described below. +/// +/// The byte array is broken up into 32-byte blocks, each block is written in turn +/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes +/// with `0_u8` and written to the output, followed by the un-padded length in bytes +/// of this final block as a `u8` +/// +/// This is loosely inspired by [COBS] encoding, and chosen over more traditional +/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. +/// +/// ## Dictionary Encoding +/// +/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and +/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse +/// the dictionary encoding, and encode the array values directly, however, this would lose +/// the benefits of dictionary encoding to reduce memory and CPU consumption. +/// +/// As such the [`RowConverter`] maintains an order-preserving dictionary encoding for each +/// dictionary encoded column. As this is a variable-length encoding, new dictionary values +/// can be added whilst preserving the sort order. +/// +/// A null dictionary value is encoded as `0_u8`. +/// +/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array +/// key determined by the order-preserving dictionary encoding +/// +/// # Ordering +/// +/// ## Float Ordering +/// +/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined +/// in the IEEE 754 (2008 revision) floating point standard. +/// +/// The ordering established by this does not always agree with the +/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, +/// they consider negative and positive zero equal, while this does not +/// +/// ## Null Ordering +/// +/// The encoding described above will order nulls first, this can be inverted by representing +/// nulls as `0xFF_u8` instead of `0_u8` +/// +/// ## Reverse Column Ordering +/// +/// The order of a given column can be reversed by negating the encoded bytes of non-null values +/// +/// ## Reconstruction +/// +/// Given a schema it would theoretically be possible to reconstruct the columnar data from +/// the row format, however, this is currently not supported. It is recommended that the row +/// format is instead used to obtain a sorted list of row indices, which can then be used +/// with [`take`](crate::compute::take) to obtain a sorted [`Array`] +/// +/// [non-comparison sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts] +/// [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort] +/// [normalized for sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf] +/// [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html] +/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing] +/// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing] +#[derive(Debug)] +pub struct RowConverter { + options: Vec<SortOptions>, + dictionaries: Vec<Option<Box<OrderPreservingInterner>>>, +} + +impl RowConverter { + /// Create a new [`RowConverter`] with the following schema and options + pub fn new(options: Vec<SortOptions>) -> Self { + let dictionaries = (0..options.len()).map(|_| None).collect(); + Self { + dictionaries, + options, + } + } + + /// Convert `cols` into [`Rows`] + /// + /// # Panics + /// + /// Panics if the schema of `cols` does not match that provided to [`RowConverter::new`] + pub fn convert(&mut self, arrays: &[ArrayRef]) -> Rows { + assert_eq!(arrays.len(), self.options.len(), "column count mismatch"); Review Comment: This logic makes sense to me ########## arrow/src/row/variable.rs: ########## @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::compute::SortOptions; +use crate::row::Rows; +use crate::util::bit_util::ceil; + +/// The block size of the variable length encoding +pub const BLOCK_SIZE: usize = 32; + +/// Returns the length of the encoded representation of a byte array +pub fn encoded_len(a: Option<&[u8]>) -> usize { + match a { + Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1), + None => 1, + } +} + +/// Variable length values are encoded as +/// +/// - leading `0` bit if null otherwise `1` +/// - 31 bits big endian length +/// - length bytes of value data +pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>( + out: &mut Rows, + i: I, + opts: SortOptions, +) { + for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) { + match maybe_val { + Some(val) if val.is_empty() => { + out.buffer[*offset] = match opts.descending { + true => !1, + false => 1, + }; + *offset += 1; + } + Some(val) => { + let block_count = ceil(val.len(), BLOCK_SIZE); + let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1); + let to_write = &mut out.buffer[*offset..end_offset]; + + // Set validity + to_write[0] = 2; + + let chunks = val.chunks_exact(BLOCK_SIZE); + let remainder = chunks.remainder(); + for (input, output) in chunks + .clone() + .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1)) + { + let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap(); + let out_block: &mut [u8; BLOCK_SIZE] = + (&mut output[..BLOCK_SIZE]).try_into().unwrap(); + + *out_block = *input; + output[BLOCK_SIZE] = u8::MAX; + } + + if !remainder.is_empty() { + let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1); + to_write[start_offset..start_offset + remainder.len()] + .copy_from_slice(remainder); + *to_write.last_mut().unwrap() = remainder.len() as u8; + } else { + *to_write.last_mut().unwrap() = BLOCK_SIZE as u8; Review Comment: Ah, the idea here is that this is the length just written 👍 ########## arrow/src/row/interner.rs: ########## @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; +use std::cmp::Ordering; +use std::num::NonZeroU32; +use std::ops::Index; + +/// An interned value +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct Interned(NonZeroU32); // We use NonZeroU32 so that `Option<Interned>` is 32 bits + +/// A byte array interner that generates normalized keys that are sorted with respect +/// to the interned values, e.g. `inter(a) < intern(b) => a < b` +#[derive(Debug, Default)] +pub struct OrderPreservingInterner { + /// Provides a lookup from [`Interned`] to the normalized key + keys: InternBuffer, + /// Provides a lookup from [`Interned`] to the normalized value + values: InternBuffer, + /// Key allocation data structure + bucket: Box<Bucket>, + + // A hash table used to perform faster re-keying, and detect duplicates + hasher: ahash::RandomState, + lookup: HashMap<Interned, (), ()>, +} + +impl OrderPreservingInterner { + /// Interns an iterator of values returning a list of [`Interned`] which can be + /// used with [`Self::normalized_key`] to retrieve the normalized keys with a + /// lifetime not tied to the mutable borrow passed to this method + pub fn intern<I, V>(&mut self, input: I) -> Vec<Option<Interned>> + where + I: IntoIterator<Item = Option<V>>, + V: AsRef<[u8]>, + { + let iter = input.into_iter(); + let capacity = iter.size_hint().0; + let mut out = Vec::with_capacity(capacity); + + // (index in output, hash value, value) + let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity); + let mut to_intern_len = 0; + + for (idx, item) in iter.enumerate() { + let value: V = match item { + Some(value) => value, + None => { + out.push(None); + continue; + } + }; + + let v = value.as_ref(); + let hash = self.hasher.hash_one(v); + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == v); + + match entry { + RawEntryMut::Occupied(o) => out.push(Some(*o.key())), + RawEntryMut::Vacant(_) => { + // Push placeholder + out.push(None); + to_intern_len += v.len(); + to_intern.push((idx, hash, value)); + } + }; + } + + to_intern.sort_unstable_by(|(_, _, a), (_, _, b)| a.as_ref().cmp(b.as_ref())); + + self.keys.offsets.reserve(to_intern.len()); + self.keys.values.reserve(to_intern.len()); // Approximation + self.values.offsets.reserve(to_intern.len()); + self.values.values.reserve(to_intern_len); + + for (idx, hash, value) in to_intern { + let val = value.as_ref(); + + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == val); + + match entry { + RawEntryMut::Occupied(o) => { + out[idx] = Some(*o.key()); + } + RawEntryMut::Vacant(v) => { + let val = value.as_ref(); + self.bucket + .insert(&mut self.values, val, &mut self.keys.values); + self.keys.values.push(0); + let interned = self.keys.append(); + + let hasher = &mut self.hasher; + let values = &self.values; + v.insert_with_hasher(hash, interned, (), |key| { + hasher.hash_one(&values[*key]) + }); + out[idx] = Some(interned); + } + } + } + + out + } + + /// Returns a null-terminated byte array that can be compared against other normalized_key + /// returned by this instance, to establish ordering of the interned values + pub fn normalized_key(&self, key: Interned) -> &[u8] { + &self.keys[key] + } +} + +/// A buffer of `[u8]` indexed by `[Interned]` +#[derive(Debug)] +struct InternBuffer { + /// Raw values + values: Vec<u8>, + /// The ith value is `&values[offsets[i]..offsets[i+1]]` + offsets: Vec<usize>, +} + +impl Default for InternBuffer { + fn default() -> Self { + Self { + values: Default::default(), + offsets: vec![0], + } + } +} + +impl InternBuffer { + /// Insert `data` returning the corresponding [`Interned`] + fn insert(&mut self, data: &[u8]) -> Interned { + self.values.extend_from_slice(data); + self.append() + } + + /// Appends the next value based on data written to `self.values` + /// returning the corresponding [`Interned`] + fn append(&mut self) -> Interned { + let idx: u32 = self.offsets.len().try_into().unwrap(); + let key = Interned(NonZeroU32::new(idx).unwrap()); + self.offsets.push(self.values.len()); + key + } +} + +impl Index<Interned> for InternBuffer { + type Output = [u8]; + + fn index(&self, key: Interned) -> &Self::Output { + let index = key.0.get() as usize; + let end = self.offsets[index]; + let start = self.offsets[index - 1]; + // SAFETY: + // self.values is never reduced in size and values appended + // to self.offsets are always less than self.values at the time + unsafe { self.values.get_unchecked(start..end) } + } +} + +/// A slot corresponds to a single byte-value in the generated normalized key +/// +/// It may contain a value, if not the first slot, and may contain a child [`Bucket`] representing +/// the next byte in the generated normalized key +#[derive(Debug, Default, Clone)] +struct Slot { + value: Option<Interned>, + /// Child values less than `self.value` if any + child: Option<Box<Bucket>>, +} + +/// Bucket is the root of the data-structure used to allocate normalized keys +/// +/// In particular it needs to generate keys that +/// +/// * Contain no `0` bytes other than the null terminator +/// * Compare lexicographically in the same manner as the encoded `data` +/// +/// The data structure consists of 255 slots, each of which can store a value. +/// Additionally each slot may contain a child bucket, containing values smaller +/// than the value within the slot +/// +/// # Allocation Strategy +/// +/// To find the insertion point within a Bucket we perform a binary search of the slots, but +/// capping the search range at 4. Visualizing this as a search tree, the root would have 64 +/// children, with subsequent non-leaf nodes each containing two children. +/// +/// The insertion point is the first empty slot we encounter, otherwise it is the first slot +/// that contains a value greater than the value being inserted +/// +/// For example, initially all slots are empty +/// +/// ```ignore +/// 0: +/// 1: +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `1000` +/// +/// ```ignore +/// 0: +/// 1: +/// 2: +/// 3: 1000 <- 1. slot is empty, insert here +/// 4: +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `500` +/// +/// ```ignore +/// 0: +/// 1: 500 <- 2. slot is empty, insert here +/// 2: +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `600` +/// +/// ```ignore +/// 0: +/// 1: 500 <- 2. compare against slot value +/// 2: 600 <- 3. slot is empty, insert here +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `400` +/// +/// ```ignore +/// 0: 400 <- 3. slot is empty, insert here +/// 1: 500 <- 2. compare against slot value +/// 2: 600 +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `700` +/// +/// ```ignore +/// 0: 400 +/// 1: 500 <- 2. compare against slot value +/// 2: 600 <- 3. slot is occupied and end of search +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// In this case we reach the end of our search and need to insert a value between +/// slots 2 and 3. To do this we create a new bucket under slot 3, and repeat +/// the process for that bucket. +/// +/// The final key will consists of the slot indexes visited incremented by 1, +/// with the final value incremented by 2, followed by a null terminator. +/// +/// So in the above example we would have +/// +/// ```ignore +/// 400: &[2, 0] +/// 500: &[3, 0] +/// 600: &[4, 0] +/// 700: &[4, 5, 0] +/// 1000: &[5, 0] +/// ``` +/// +#[derive(Debug, Clone)] +struct Bucket { + slots: Box<[Slot]>, +} + +impl Default for Bucket { + fn default() -> Self { + let slots = (0..255).map(|_| Slot::default()).collect::<Vec<_>>().into(); + Self { slots } + } +} + +impl Bucket { + /// Perform a skewed binary search to find the first slot that is empty or less + /// + /// Returns `Ok(idx)` if an exact match is found, otherwise returns `Err(idx)` + /// containing the slot index to insert at + fn insert_pos(&self, values_buf: &InternBuffer, data: &[u8]) -> Result<usize, usize> { + let mut size = self.slots.len() - 1; + let mut left = 0; + let mut right = size; + while left < right { + // Skew binary search to leave gaps of at most 3 elements + let mid = left + (size / 2).min(3); + + let slot = &self.slots[mid]; + let val = match slot.value { + Some(val) => val, + None => return Err(mid), + }; + + let cmp = values_buf[val].cmp(data); + if cmp == Ordering::Less { + left = mid + 1; + } else if cmp == Ordering::Greater { + right = mid; + } else { + return Ok(mid); + } + + size = right - left; + } + Err(left) + } + + /// Insert `data` into this bucket or one of its children, appending the + /// normalized key to `out` as it is constructed + /// + /// # Panics + /// + /// Panics if the value already exists + fn insert(&mut self, values_buf: &mut InternBuffer, data: &[u8], out: &mut Vec<u8>) { + match self.insert_pos(values_buf, data) { + Ok(_) => unreachable!("value already exists"), + Err(idx) => { + let slot = &mut self.slots[idx]; + // Cannot insert a value into slot 254 as would overflow byte, but also + // would prevent inserting any larger values, as the child bucket can + // only contain values less than the slot + if idx != 254 && slot.value.is_none() { + out.push(idx as u8 + 2); + slot.value = Some(values_buf.insert(data)) + } else { + out.push(idx as u8 + 1); + slot.child + .get_or_insert_with(Default::default) + .insert(values_buf, data, out); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::prelude::*; + + // Clippy isn't smart enough to understand dropping mutability + #[allow(clippy::needless_collect)] + fn test_intern_values(values: &[u64]) { + let mut interner = OrderPreservingInterner::default(); + + // Intern a single value at a time to check ordering + let interned: Vec<_> = values + .iter() + .flat_map(|v| interner.intern([Some(&v.to_be_bytes())])) + .map(Option::unwrap) + .collect(); + + let interned: Vec<_> = interned + .into_iter() + .map(|x| interner.normalized_key(x)) + .collect(); + + for (i, a) in interned.iter().enumerate() { + for (j, b) in interned.iter().enumerate() { + let interned_cmp = a.cmp(b); + let values_cmp = values[i].cmp(&values[j]); + assert_eq!( + interned_cmp, values_cmp, + "({:?} vs {:?}) vs ({} vs {})", + a, b, values[i], values[j] + ) + } + } + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_interner() { + test_intern_values(&[8, 6, 5, 7]); + + let mut values: Vec<_> = (0_u64..2000).collect(); Review Comment: I wonder if there is any value (as a separate integration test perhaps) for inserting enough values to result in at least two bucket levels (so a bucket has an entry that is a child that has an entry that has a child). Or maybe 2000 is enough to do this already ########## arrow/src/row/variable.rs: ########## @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::compute::SortOptions; +use crate::row::Rows; +use crate::util::bit_util::ceil; + +/// The block size of the variable length encoding +pub const BLOCK_SIZE: usize = 32; + +/// Returns the length of the encoded representation of a byte array +pub fn encoded_len(a: Option<&[u8]>) -> usize { + match a { + Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1), + None => 1, + } +} + +/// Variable length values are encoded as +/// +/// - leading `0` bit if null otherwise `1` +/// - 31 bits big endian length +/// - length bytes of value data +pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>( + out: &mut Rows, + i: I, + opts: SortOptions, +) { + for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) { + match maybe_val { + Some(val) if val.is_empty() => { + out.buffer[*offset] = match opts.descending { + true => !1, + false => 1, + }; + *offset += 1; + } + Some(val) => { + let block_count = ceil(val.len(), BLOCK_SIZE); + let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1); + let to_write = &mut out.buffer[*offset..end_offset]; + + // Set validity + to_write[0] = 2; Review Comment: It is still 2 -- I read the comments as saying the validity byte is a `1`. Sorry for my ignorance but I don't understand this particular constant -- can you elaborate? ########## arrow/src/row/interner.rs: ########## @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; +use std::cmp::Ordering; +use std::num::NonZeroU32; +use std::ops::Index; + +/// An interned value +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct Interned(NonZeroU32); // We use NonZeroU32 so that `Option<Interned>` is 32 bits + +/// A byte array interner that generates normalized keys that are sorted with respect +/// to the interned values, e.g. `inter(a) < intern(b) => a < b` +#[derive(Debug, Default)] +pub struct OrderPreservingInterner { + /// Provides a lookup from [`Interned`] to the normalized key + keys: InternBuffer, + /// Provides a lookup from [`Interned`] to the normalized value + values: InternBuffer, + /// Key allocation data structure + bucket: Box<Bucket>, + + // A hash table used to perform faster re-keying, and detect duplicates + hasher: ahash::RandomState, + lookup: HashMap<Interned, (), ()>, +} + +impl OrderPreservingInterner { + /// Interns an iterator of values returning a list of [`Interned`] which can be + /// used with [`Self::normalized_key`] to retrieve the normalized keys with a + /// lifetime not tied to the mutable borrow passed to this method + pub fn intern<I, V>(&mut self, input: I) -> Vec<Option<Interned>> + where + I: IntoIterator<Item = Option<V>>, + V: AsRef<[u8]>, + { + let iter = input.into_iter(); + let capacity = iter.size_hint().0; + let mut out = Vec::with_capacity(capacity); + + // (index in output, hash value, value) + let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity); + let mut to_intern_len = 0; + + for (idx, item) in iter.enumerate() { + let value: V = match item { + Some(value) => value, + None => { + out.push(None); + continue; + } + }; + + let v = value.as_ref(); + let hash = self.hasher.hash_one(v); + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == v); + + match entry { + RawEntryMut::Occupied(o) => out.push(Some(*o.key())), + RawEntryMut::Vacant(_) => { + // Push placeholder + out.push(None); + to_intern_len += v.len(); + to_intern.push((idx, hash, value)); + } + }; + } + + to_intern.sort_unstable_by(|(_, _, a), (_, _, b)| a.as_ref().cmp(b.as_ref())); + + self.keys.offsets.reserve(to_intern.len()); + self.keys.values.reserve(to_intern.len()); // Approximation + self.values.offsets.reserve(to_intern.len()); + self.values.values.reserve(to_intern_len); + + for (idx, hash, value) in to_intern { + let val = value.as_ref(); + + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == val); + + match entry { + RawEntryMut::Occupied(o) => { + out[idx] = Some(*o.key()); + } + RawEntryMut::Vacant(v) => { + let val = value.as_ref(); + self.bucket + .insert(&mut self.values, val, &mut self.keys.values); + self.keys.values.push(0); + let interned = self.keys.append(); + + let hasher = &mut self.hasher; + let values = &self.values; + v.insert_with_hasher(hash, interned, (), |key| { + hasher.hash_one(&values[*key]) + }); + out[idx] = Some(interned); + } + } + } + + out + } + + /// Returns a null-terminated byte array that can be compared against other normalized_key + /// returned by this instance, to establish ordering of the interned values + pub fn normalized_key(&self, key: Interned) -> &[u8] { + &self.keys[key] + } +} + +/// A buffer of `[u8]` indexed by `[Interned]` +#[derive(Debug)] +struct InternBuffer { + /// Raw values + values: Vec<u8>, + /// The ith value is `&values[offsets[i]..offsets[i+1]]` + offsets: Vec<usize>, +} + +impl Default for InternBuffer { + fn default() -> Self { + Self { + values: Default::default(), + offsets: vec![0], + } + } +} + +impl InternBuffer { + /// Insert `data` returning the corresponding [`Interned`] + fn insert(&mut self, data: &[u8]) -> Interned { + self.values.extend_from_slice(data); + self.append() + } + + /// Appends the next value based on data written to `self.values` + /// returning the corresponding [`Interned`] + fn append(&mut self) -> Interned { + let idx: u32 = self.offsets.len().try_into().unwrap(); + let key = Interned(NonZeroU32::new(idx).unwrap()); + self.offsets.push(self.values.len()); + key + } +} + +impl Index<Interned> for InternBuffer { + type Output = [u8]; + + fn index(&self, key: Interned) -> &Self::Output { + let index = key.0.get() as usize; + let end = self.offsets[index]; + let start = self.offsets[index - 1]; + // SAFETY: + // self.values is never reduced in size and values appended + // to self.offsets are always less than self.values at the time + unsafe { self.values.get_unchecked(start..end) } + } +} + +/// A slot corresponds to a single byte-value in the generated normalized key +/// +/// It may contain a value, if not the first slot, and may contain a child [`Bucket`] representing +/// the next byte in the generated normalized key +#[derive(Debug, Default, Clone)] +struct Slot { + value: Option<Interned>, + /// Child values less than `self.value` if any + child: Option<Box<Bucket>>, +} + +/// Bucket is the root of the data-structure used to allocate normalized keys +/// +/// In particular it needs to generate keys that +/// +/// * Contain no `0` bytes other than the null terminator +/// * Compare lexicographically in the same manner as the encoded `data` +/// +/// The data structure consists of 255 slots, each of which can store a value. +/// Additionally each slot may contain a child bucket, containing values smaller +/// than the value within the slot +/// +/// # Allocation Strategy +/// +/// To find the insertion point within a Bucket we perform a binary search of the slots, but +/// capping the search range at 4. Visualizing this as a search tree, the root would have 64 +/// children, with subsequent non-leaf nodes each containing two children. +/// +/// The insertion point is the first empty slot we encounter, otherwise it is the first slot +/// that contains a value greater than the value being inserted +/// +/// For example, initially all slots are empty +/// +/// ```ignore +/// 0: +/// 1: +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `1000` +/// +/// ```ignore +/// 0: +/// 1: +/// 2: +/// 3: 1000 <- 1. slot is empty, insert here +/// 4: +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `500` +/// +/// ```ignore +/// 0: +/// 1: 500 <- 2. slot is empty, insert here +/// 2: +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `600` +/// +/// ```ignore +/// 0: +/// 1: 500 <- 2. compare against slot value +/// 2: 600 <- 3. slot is empty, insert here +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `400` +/// +/// ```ignore +/// 0: 400 <- 3. slot is empty, insert here +/// 1: 500 <- 2. compare against slot value +/// 2: 600 +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `700` +/// +/// ```ignore +/// 0: 400 +/// 1: 500 <- 2. compare against slot value +/// 2: 600 <- 3. slot is occupied and end of search +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// In this case we reach the end of our search and need to insert a value between +/// slots 2 and 3. To do this we create a new bucket under slot 3, and repeat +/// the process for that bucket. +/// +/// The final key will consists of the slot indexes visited incremented by 1, +/// with the final value incremented by 2, followed by a null terminator. Review Comment: Ah, the null terminator idea is the key thing I was missing in my initial reading. This is a very clever idea 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
