tustvold commented on code in PR #2593: URL: https://github.com/apache/arrow-rs/pull/2593#discussion_r966425493
########## arrow/src/row/mod.rs: ########## @@ -0,0 +1,815 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A comparable row-oriented representation of a collection of [`Array`] + +use crate::array::{ + as_boolean_array, as_generic_binary_array, as_largestring_array, as_string_array, + Array, ArrayRef, Decimal128Array, Decimal256Array, +}; +use crate::compute::SortOptions; +use crate::datatypes::*; +use crate::row::interner::{Interned, OrderPreservingInterner}; +use crate::{downcast_dictionary_array, downcast_primitive_array}; + +mod fixed; +mod interner; +mod variable; + +/// Converts arrays into a row-oriented format that are [normalized for sorting]. +/// +/// In particular, a byte-wise comparison of the rows, e.g. [`memcmp`], is sufficient +/// to establish the ordering of two rows, allowing for extremely fast comparisons, +/// and permitting the use of [non-comparison sorts] such as [radix sort] +/// +/// Comparing [`Rows`] generated by different [`RowConverter`] is not guaranteed to +/// yield a meaningful ordering +/// +/// # Format +/// +/// The encoding of the row format should not be considered stable, but is documented here +/// for reference. +/// +/// ## Unsigned Integer Encoding +/// +/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding +/// to the integer's length +/// +/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the +/// integer +/// +/// ## Signed Integer Encoding +/// +/// Signed integers have their most significant sign bit flipped, and are then encoded in the +/// same manner as an unsigned integer +/// +/// ## Float Encoding +/// +/// Floats are converted from IEEE 754 representation to a signed integer representation +/// by flipping all bar the sign bit if they are negative. +/// +/// They are then encoded in the same manner as a signed integer +/// +/// ## Variable Length Bytes Encoding +/// +/// A null is encoded as a `0_u8` +/// +/// An empty byte array is encoded as `1_u8` +/// +/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array +/// encoded using a block based scheme described below. +/// +/// The byte array is broken up into 32-byte blocks, each block is written in turn +/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes +/// with `0_u8` and written to the output, followed by the un-padded length in bytes +/// of this final block as a `u8` +/// +/// This is loosely inspired by [COBS] encoding, and chosen over more traditional +/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. +/// +/// ## Dictionary Encoding +/// +/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and +/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse +/// the dictionary encoding, and encode the array values directly, however, this would lose +/// the benefits of dictionary encoding to reduce memory and CPU consumption. +/// +/// As such the [`RowConverter`] maintains an order-preserving dictionary encoding for each +/// dictionary encoded column. As this is a variable-length encoding, new dictionary values +/// can be added whilst preserving the sort order. +/// +/// A null dictionary value is encoded as `0_u8`. +/// +/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array +/// key determined by the order-preserving dictionary encoding +/// +/// # Ordering +/// +/// ## Float Ordering +/// +/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined +/// in the IEEE 754 (2008 revision) floating point standard. +/// +/// The ordering established by this does not always agree with the +/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, +/// they consider negative and positive zero equal, while this does not +/// +/// ## Null Ordering +/// +/// The encoding described above will order nulls first, this can be inverted by representing +/// nulls as `0xFF_u8` instead of `0_u8` +/// +/// ## Reverse Column Ordering +/// +/// The order of a given column can be reversed by negating the encoded bytes of non-null values +/// +/// ## Reconstruction +/// +/// Given a schema it would theoretically be possible to reconstruct the columnar data from +/// the row format, however, this is currently not supported. It is recommended that the row +/// format is instead used to obtain a sorted list of row indices, which can then be used +/// with [`take`](crate::compute::take) to obtain a sorted [`Array`] +/// +/// [non-comparison sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts] +/// [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort] +/// [normalized for sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf] +/// [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html] +/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing] +/// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing] +#[derive(Debug)] +pub struct RowConverter { + options: Vec<SortOptions>, + dictionaries: Vec<Option<Box<OrderPreservingInterner>>>, +} + +impl RowConverter { + /// Create a new [`RowConverter`] with the following schema and options + pub fn new(options: Vec<SortOptions>) -> Self { + let dictionaries = (0..options.len()).map(|_| None).collect(); + Self { + dictionaries, + options, + } + } + + /// Convert `cols` into [`Rows`] + /// + /// # Panics + /// + /// Panics if the schema of `cols` does not match that provided to [`RowConverter::new`] + pub fn convert(&mut self, arrays: &[ArrayRef]) -> Rows { + assert_eq!(arrays.len(), self.options.len(), "column count mismatch"); + + let dictionaries: Vec<_> = arrays + .iter() + .zip(&mut self.dictionaries) + .map(|(array, dictionary)| { + let values = downcast_dictionary_array! { + array => array.values(), + _ => return None + }; + + let interner = dictionary.get_or_insert_with(Default::default); + + let mapping: Vec<_> = compute_dictionary_mapping(interner, values) + .into_iter() + .map(|maybe_interned| { + maybe_interned.map(|interned| interner.normalized_key(interned)) + }) + .collect(); + + Some(mapping) + }) + .collect(); + + let mut rows = new_empty_rows(arrays, &dictionaries); + + for ((array, options), dictionary) in + arrays.iter().zip(&self.options).zip(dictionaries) + { + // We encode a column at a time to minimise dispatch overheads + encode_column(&mut rows, array, *options, dictionary.as_deref()) + } + + if cfg!(debug_assertions) { + assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len()); + rows.offsets + .windows(2) + .for_each(|w| assert!(w[0] < w[1], "offsets should be monotonic")); + } + + rows + } +} + +/// A row-oriented representation of arrow data, that is normalized for comparison +/// +/// See [`RowConverter`] +#[derive(Debug)] +pub struct Rows { + buffer: Box<[u8]>, + offsets: Box<[usize]>, +} + +impl Rows { + pub fn row(&self, row: usize) -> Row<'_> { + let end = self.offsets[row + 1]; + let start = self.offsets[row]; + Row(&self.buffer[start..end]) + } + + pub fn num_rows(&self) -> usize { + self.offsets.len() - 1 + } +} + +/// A comparable representation of a row, see [`Rows`] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Row<'a>(&'a [u8]); + +impl<'a> AsRef<[u8]> for Row<'a> { + fn as_ref(&self) -> &[u8] { + self.0 + } +} + +/// Computes the dictionary mapping for the given dictionary values +fn compute_dictionary_mapping( + interner: &mut OrderPreservingInterner, + values: &ArrayRef, +) -> Vec<Option<Interned>> { + use fixed::FixedLengthEncoding; + downcast_primitive_array! { + values => interner + .intern(values.iter().map(|x| x.map(|x| x.encode()))), + DataType::Binary => { + let iter = as_generic_binary_array::<i64>(values).iter(); + interner.intern(iter) + } + DataType::LargeBinary => { + let iter = as_generic_binary_array::<i64>(values).iter(); + interner.intern(iter) + } + DataType::Utf8 => { + let iter = as_string_array(values).iter().map(|x| x.map(|x| x.as_bytes())); + interner.intern(iter) + } + DataType::LargeUtf8 => { + let iter = as_largestring_array(values).iter().map(|x| x.map(|x| x.as_bytes())); + interner.intern(iter) + } + t => unreachable!("dictionary value {} is not supported", t) + } +} + +/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] +fn new_empty_rows( + cols: &[ArrayRef], + dictionaries: &[Option<Vec<Option<&[u8]>>>], +) -> Rows { + let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); + let mut lengths = vec![0; num_rows]; + + for (array, dict) in cols.iter().zip(dictionaries) { + downcast_primitive_array! { + array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), + DataType::Null => lengths.iter_mut().for_each(|x| *x += 1), + DataType::Boolean => lengths.iter_mut().for_each(|x| *x += 2), + DataType::Decimal128(_, _) => lengths.iter_mut().for_each(|x| *x += 17), + DataType::Decimal256(_, _) => lengths.iter_mut().for_each(|x| *x += 33), + DataType::Binary => as_generic_binary_array::<i32>(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::LargeBinary => as_generic_binary_array::<i64>(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::Utf8 => as_string_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::LargeUtf8 => as_largestring_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => { + let dict = dict.as_ref().unwrap(); + for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { + match v.and_then(|v| dict[v as usize]) { + Some(k) => *length += k.len() + 1, + None => *length += 1, + } + } + } + _ => unreachable!(), + } + t => unimplemented!("not yet implemented: {}", t) + } + } + + let mut offsets = Vec::with_capacity(num_rows + 1); + offsets.push(0); + + let mut cur_offset = 0_usize; + for l in lengths { + offsets.push(cur_offset); + cur_offset = cur_offset.checked_add(l).expect("overflow"); + } + + let buffer = vec![0_u8; cur_offset]; + + Rows { + buffer: buffer.into(), + offsets: offsets.into(), Review Comment: To prevent resizing 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
