This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 69c04db96 Make dictionary preservation optional in row encoding (#3831)
69c04db96 is described below
commit 69c04db962b915b8a2d3783853da0ce95a94c0ef
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Mar 10 14:46:31 2023 +0100
Make dictionary preservation optional in row encoding (#3831)
* Make dictionary preservation optional in row encoding
* Review feedback
---
arrow-row/src/dictionary.rs | 20 ++++-
arrow-row/src/lib.rs | 208 +++++++++++++++++++++++++++++++++++++-------
arrow/benches/row_format.rs | 39 ++++++---
3 files changed, 223 insertions(+), 44 deletions(-)
diff --git a/arrow-row/src/dictionary.rs b/arrow-row/src/dictionary.rs
index e332e1131..bacc116ca 100644
--- a/arrow-row/src/dictionary.rs
+++ b/arrow-row/src/dictionary.rs
@@ -17,7 +17,7 @@
use crate::fixed::{FixedLengthEncoding, FromSlice};
use crate::interner::{Interned, OrderPreservingInterner};
-use crate::{null_sentinel, Rows};
+use crate::{null_sentinel, Row, Rows};
use arrow_array::builder::*;
use arrow_array::cast::*;
use arrow_array::types::*;
@@ -56,6 +56,24 @@ pub fn compute_dictionary_mapping(
}
}
+/// Encode dictionary values not preserving the dictionary encoding
+pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
+ out: &mut Rows,
+ column: &DictionaryArray<K>,
+ values: &Rows,
+ null: &Row<'_>,
+) {
+ for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
+ let row = match k {
+ Some(k) => values.row(k.as_usize()).data,
+ None => null.data,
+ };
+ let end_offset = *offset + row.len();
+ out.buffer[*offset..end_offset].copy_from_slice(row);
+ *offset = end_offset;
+ }
+}
+
/// Dictionary types are encoded as
///
/// - single `0_u8` if null
diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index 2e489c974..e4b02fbf2 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -137,6 +137,7 @@ use arrow_schema::*;
use crate::dictionary::{
compute_dictionary_mapping, decode_dictionary, encode_dictionary,
+ encode_dictionary_values,
};
use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
use crate::interner::OrderPreservingInterner;
@@ -426,7 +427,14 @@ enum Codec {
/// No additional codec state is necessary
Stateless,
/// The interner used to encode dictionary values
+ ///
+ /// Used when preserving the dictionary encoding
Dictionary(OrderPreservingInterner),
+ /// A row converter for the dictionary values
+ /// and the encoding of a row containing only nulls
+ ///
+ /// Used when not preserving dictionary encoding
+ DictionaryValues(RowConverter, OwnedRow),
/// A row converter for the child fields
/// and the encoding of a row containing only nulls
Struct(RowConverter, OwnedRow),
@@ -437,7 +445,25 @@ enum Codec {
impl Codec {
fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
match &sort_field.data_type {
- DataType::Dictionary(_, _) =>
Ok(Self::Dictionary(Default::default())),
+ DataType::Dictionary(_, values) => match
sort_field.preserve_dictionaries {
+ true => Ok(Self::Dictionary(Default::default())),
+ false => {
+ let sort_field = SortField::new_with_options(
+ values.as_ref().clone(),
+ sort_field.options,
+ );
+
+ let mut converter = RowConverter::new(vec![sort_field])?;
+ let null_array = new_null_array(values.as_ref(), 1);
+ let nulls = converter.convert_columns(&[null_array])?;
+
+ let owned = OwnedRow {
+ data: nulls.buffer,
+ config: nulls.config,
+ };
+ Ok(Self::DictionaryValues(converter, owned))
+ }
+ },
d if !d.is_nested() => Ok(Self::Stateless),
DataType::List(f) | DataType::LargeList(f) => {
// The encoded contents will be inverted if descending is set
to true
@@ -501,6 +527,15 @@ impl Codec {
Ok(Encoder::Dictionary(mapping))
}
+ Codec::DictionaryValues(converter, nulls) => {
+ let values = downcast_dictionary_array! {
+ array => array.values(),
+ _ => unreachable!()
+ };
+
+ let rows = converter.convert_columns(&[values.clone()])?;
+ Ok(Encoder::DictionaryValues(rows, nulls.row()))
+ }
Codec::Struct(converter, null) => {
let v = as_struct_array(array);
let rows = converter.convert_columns(v.columns())?;
@@ -522,6 +557,9 @@ impl Codec {
match self {
Codec::Stateless => 0,
Codec::Dictionary(interner) => interner.size(),
+ Codec::DictionaryValues(converter, nulls) => {
+ converter.size() + nulls.data.len()
+ }
Codec::Struct(converter, nulls) => converter.size() +
nulls.data.len(),
Codec::List(converter) => converter.size(),
}
@@ -534,6 +572,8 @@ enum Encoder<'a> {
Stateless,
/// The mapping from dictionary keys to normalized keys
Dictionary(Vec<Option<&'a [u8]>>),
+ /// The encoding of the child array and the encoding of a null row
+ DictionaryValues(Rows, Row<'a>),
/// The row encoding of the child arrays and the encoding of a null row
///
/// It is necessary to encode to a temporary [`Rows`] to avoid serializing
@@ -551,6 +591,8 @@ pub struct SortField {
options: SortOptions,
/// Data type
data_type: DataType,
+ /// Preserve dictionaries
+ preserve_dictionaries: bool,
}
impl SortField {
@@ -561,7 +603,30 @@ impl SortField {
/// Create a new column with the given data type and [`SortOptions`]
pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self
{
- Self { options, data_type }
+ Self {
+ options,
+ data_type,
+ preserve_dictionaries: true,
+ }
+ }
+
+ /// By default dictionaries are preserved as described on [`RowConverter`]
+ ///
+ /// However, this process requires maintaining and incrementally updating
+ /// an order-preserving mapping of dictionary values. This is relatively
expensive
+ /// computationally but reduces the size of the encoded rows, minimising
memory
+ /// usage and potentially yielding faster comparisons.
+ ///
+ /// Some applications may wish to instead trade-off space efficiency, for
improved
+ /// encoding performance, by instead encoding dictionary values directly
+ ///
+ /// When `preserve_dictionaries` is true, fields will instead be encoded
as their
+ /// underlying value, reversing any dictionary encoding
+ pub fn preserve_dictionaries(self, preserve_dictionaries: bool) -> Self {
+ Self {
+ preserve_dictionaries,
+ ..self
+ }
}
/// Return size of this instance in bytes.
@@ -1045,6 +1110,19 @@ fn new_empty_rows(cols: &[ArrayRef], encoders:
&[Encoder], config: RowConfig) ->
_ => unreachable!(),
}
}
+ Encoder::DictionaryValues(values, null) => {
+ downcast_dictionary_array! {
+ array => {
+ for (v, length) in
array.keys().iter().zip(lengths.iter_mut()) {
+ *length += match v {
+ Some(k) => values.row(k.as_usize()).data.len(),
+ None => null.data.len(),
+ }
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
Encoder::Struct(rows, null) => {
let array = as_struct_array(array);
lengths.iter_mut().enumerate().for_each(|(idx, length)| {
@@ -1143,6 +1221,12 @@ fn encode_column(
_ => unreachable!()
}
}
+ Encoder::DictionaryValues(values, nulls) => {
+ downcast_dictionary_array! {
+ column => encode_dictionary_values(out, column, values, nulls),
+ _ => unreachable!()
+ }
+ }
Encoder::Struct(rows, null) => {
let array = as_struct_array(column);
let null_sentinel = null_sentinel(opts);
@@ -1221,6 +1305,10 @@ unsafe fn decode_column(
_ => unreachable!()
}
}
+ Codec::DictionaryValues(converter, _) => {
+ let cols = converter.convert_raw(rows, validate_utf8)?;
+ cols.into_iter().next().unwrap()
+ }
Codec::Struct(converter, _) => {
let (null_count, nulls) = fixed::decode_nulls(rows);
rows.iter_mut().for_each(|row| *row = &row[1..]);
@@ -1557,8 +1645,25 @@ mod tests {
assert_eq!(&cols[0], &col);
}
+ /// If `exact` is false performs a logical comparison between a and
dictionary-encoded b
+ fn dictionary_eq(exact: bool, a: &dyn Array, b: &dyn Array) {
+ match b.data_type() {
+ DataType::Dictionary(_, v) if !exact => {
+ assert_eq!(a.data_type(), v.as_ref());
+ let b = arrow_cast::cast(b, v).unwrap();
+ assert_eq!(a.data(), b.data())
+ }
+ _ => assert_eq!(a.data(), b.data()),
+ }
+ }
+
#[test]
fn test_string_dictionary() {
+ test_string_dictionary_impl(false);
+ test_string_dictionary_impl(true);
+ }
+
+ fn test_string_dictionary_impl(preserve: bool) {
let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
Some("foo"),
Some("hello"),
@@ -1570,8 +1675,8 @@ mod tests {
Some("hello"),
])) as ArrayRef;
- let mut converter =
-
RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
+ let field =
SortField::new(a.data_type().clone()).preserve_dictionaries(preserve);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
assert!(rows_a.row(3) < rows_a.row(5));
@@ -1584,7 +1689,7 @@ mod tests {
assert_eq!(rows_a.row(1), rows_a.row(7));
let cols = converter.convert_rows(&rows_a).unwrap();
- assert_eq!(&cols[0], &a);
+ dictionary_eq(preserve, &cols[0], &a);
let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
Some("hello"),
@@ -1598,7 +1703,7 @@ mod tests {
assert!(rows_b.row(2) < rows_a.row(0));
let cols = converter.convert_rows(&rows_b).unwrap();
- assert_eq!(&cols[0], &b);
+ dictionary_eq(preserve, &cols[0], &b);
let mut converter = RowConverter::new(vec![SortField::new_with_options(
a.data_type().clone(),
@@ -1606,7 +1711,8 @@ mod tests {
descending: true,
nulls_first: false,
},
- )])
+ )
+ .preserve_dictionaries(preserve)])
.unwrap();
let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
@@ -1616,7 +1722,7 @@ mod tests {
assert!(rows_c.row(3) > rows_c.row(0));
let cols = converter.convert_rows(&rows_c).unwrap();
- assert_eq!(&cols[0], &a);
+ dictionary_eq(preserve, &cols[0], &a);
let mut converter = RowConverter::new(vec![SortField::new_with_options(
a.data_type().clone(),
@@ -1624,7 +1730,8 @@ mod tests {
descending: true,
nulls_first: true,
},
- )])
+ )
+ .preserve_dictionaries(preserve)])
.unwrap();
let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
@@ -1634,7 +1741,7 @@ mod tests {
assert!(rows_c.row(3) < rows_c.row(0));
let cols = converter.convert_rows(&rows_c).unwrap();
- assert_eq!(&cols[0], &a);
+ dictionary_eq(preserve, &cols[0], &a);
}
#[test]
@@ -1694,15 +1801,19 @@ mod tests {
builder.append(-1).unwrap();
let a = builder.finish();
-
- let mut converter =
-
RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
- let rows = converter.convert_columns(&[Arc::new(a)]).unwrap();
- assert!(rows.row(0) < rows.row(1));
- assert!(rows.row(2) < rows.row(0));
- assert!(rows.row(3) < rows.row(2));
- assert!(rows.row(6) < rows.row(2));
- assert!(rows.row(3) < rows.row(6));
+ let data_type = a.data_type().clone();
+ let columns = [Arc::new(a) as ArrayRef];
+
+ for preserve in [true, false] {
+ let field =
SortField::new(data_type.clone()).preserve_dictionaries(preserve);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
+ let rows = converter.convert_columns(&columns).unwrap();
+ assert!(rows.row(0) < rows.row(1));
+ assert!(rows.row(2) < rows.row(0));
+ assert!(rows.row(3) < rows.row(2));
+ assert!(rows.row(6) < rows.row(2));
+ assert!(rows.row(3) < rows.row(6));
+ }
}
#[test]
@@ -1722,15 +1833,17 @@ mod tests {
.build()
.unwrap();
- let mut converter =
RowConverter::new(vec![SortField::new(data_type)]).unwrap();
- let rows = converter
-
.convert_columns(&[Arc::new(DictionaryArray::<Int32Type>::from(data))])
- .unwrap();
+ let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as
ArrayRef];
+ for preserve in [true, false] {
+ let field =
SortField::new(data_type.clone()).preserve_dictionaries(preserve);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
+ let rows = converter.convert_columns(&columns).unwrap();
- assert_eq!(rows.row(0), rows.row(1));
- assert_eq!(rows.row(3), rows.row(4));
- assert_eq!(rows.row(4), rows.row(5));
- assert!(rows.row(3) < rows.row(0));
+ assert_eq!(rows.row(0), rows.row(1));
+ assert_eq!(rows.row(3), rows.row(4));
+ assert_eq!(rows.row(4), rows.row(5));
+ assert!(rows.row(3) < rows.row(0));
+ }
}
#[test]
@@ -1974,6 +2087,35 @@ mod tests {
test_nested_list::<i64>();
}
+ #[test]
+ fn test_dictionary_preserving() {
+ let mut dict = StringDictionaryBuilder::<Int32Type>::new();
+ dict.append_value("foo");
+ dict.append_value("foo");
+ dict.append_value("bar");
+ dict.append_value("bar");
+ dict.append_value("bar");
+ dict.append_value("bar");
+
+ let array = Arc::new(dict.finish()) as ArrayRef;
+ let preserve = SortField::new(array.data_type().clone());
+ let non_preserve = preserve.clone().preserve_dictionaries(false);
+
+ let mut c1 = RowConverter::new(vec![preserve]).unwrap();
+ let r1 = c1.convert_columns(&[array.clone()]).unwrap();
+
+ let mut c2 = RowConverter::new(vec![non_preserve]).unwrap();
+ let r2 = c2.convert_columns(&[array.clone()]).unwrap();
+
+ for r in r1.iter() {
+ assert_eq!(r.data.len(), 3);
+ }
+
+ for r in r2.iter() {
+ assert_eq!(r.data.len(), 34);
+ }
+ }
+
fn generate_primitive_array<K>(len: usize, valid_percent: f64) ->
PrimitiveArray<K>
where
K: ArrowPrimitiveType,
@@ -2129,12 +2271,18 @@ mod tests {
})
.collect();
+ let preserve: Vec<_> = (0..num_columns).map(|_|
rng.gen_bool(0.5)).collect();
+
let comparator =
LexicographicalComparator::try_new(&sort_columns).unwrap();
let columns = options
.into_iter()
.zip(&arrays)
- .map(|(o, a)|
SortField::new_with_options(a.data_type().clone(), o))
+ .zip(&preserve)
+ .map(|((o, a), p)| {
+ SortField::new_with_options(a.data_type().clone(), o)
+ .preserve_dictionaries(*p)
+ })
.collect();
let mut converter = RowConverter::new(columns).unwrap();
@@ -2160,9 +2308,9 @@ mod tests {
}
let back = converter.convert_rows(&rows).unwrap();
- for (actual, expected) in back.iter().zip(&arrays) {
+ for ((actual, expected), preserve) in
back.iter().zip(&arrays).zip(preserve) {
actual.data().validate_full().unwrap();
- assert_eq!(actual, expected)
+ dictionary_eq(preserve, actual, expected)
}
}
}
diff --git a/arrow/benches/row_format.rs b/arrow/benches/row_format.rs
index 961cf07de..12ce71764 100644
--- a/arrow/benches/row_format.rs
+++ b/arrow/benches/row_format.rs
@@ -30,10 +30,18 @@ use arrow_array::Array;
use criterion::{black_box, Criterion};
use std::sync::Arc;
-fn do_bench(c: &mut Criterion, name: &str, cols: Vec<ArrayRef>) {
+fn do_bench(
+ c: &mut Criterion,
+ name: &str,
+ cols: Vec<ArrayRef>,
+ preserve_dictionaries: bool,
+) {
let fields: Vec<_> = cols
.iter()
- .map(|x| SortField::new(x.data_type().clone()))
+ .map(|x| {
+ SortField::new(x.data_type().clone())
+ .preserve_dictionaries(preserve_dictionaries)
+ })
.collect();
c.bench_function(&format!("convert_columns {name}"), |b| {
@@ -57,42 +65,46 @@ fn do_bench(c: &mut Criterion, name: &str, cols:
Vec<ArrayRef>) {
fn row_bench(c: &mut Criterion) {
let cols = vec![Arc::new(create_primitive_array::<UInt64Type>(4096, 0.))
as ArrayRef];
- do_bench(c, "4096 u64(0)", cols);
+ do_bench(c, "4096 u64(0)", cols, true);
let cols = vec![Arc::new(create_primitive_array::<Int64Type>(4096, 0.)) as
ArrayRef];
- do_bench(c, "4096 i64(0)", cols);
+ do_bench(c, "4096 i64(0)", cols, true);
let cols =
vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 10)) as
ArrayRef];
- do_bench(c, "4096 string(10, 0)", cols);
+ do_bench(c, "4096 string(10, 0)", cols, true);
let cols =
vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 30)) as
ArrayRef];
- do_bench(c, "4096 string(30, 0)", cols);
+ do_bench(c, "4096 string(30, 0)", cols, true);
let cols =
vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 100)) as
ArrayRef];
- do_bench(c, "4096 string(100, 0)", cols);
+ do_bench(c, "4096 string(100, 0)", cols, true);
let cols =
vec![Arc::new(create_string_array_with_len::<i32>(4096, 0.5, 100)) as
ArrayRef];
- do_bench(c, "4096 string(100, 0.5)", cols);
+ do_bench(c, "4096 string(100, 0.5)", cols, true);
let cols =
vec![Arc::new(create_string_dict_array::<Int32Type>(4096, 0., 10)) as
ArrayRef];
- do_bench(c, "4096 string_dictionary(10, 0)", cols);
+ do_bench(c, "4096 string_dictionary(10, 0)", cols, true);
let cols =
vec![Arc::new(create_string_dict_array::<Int32Type>(4096, 0., 30)) as
ArrayRef];
- do_bench(c, "4096 string_dictionary(30, 0)", cols);
+ do_bench(c, "4096 string_dictionary(30, 0)", cols, true);
let cols =
vec![Arc::new(create_string_dict_array::<Int32Type>(4096, 0., 100)) as
ArrayRef];
- do_bench(c, "4096 string_dictionary(100, 0)", cols);
+ do_bench(c, "4096 string_dictionary(100, 0)", cols.clone(), true);
+ let name = "4096 string_dictionary_non_preserving(100, 0)";
+ do_bench(c, name, cols, false);
let cols =
vec![Arc::new(create_string_dict_array::<Int32Type>(4096, 0.5, 100))
as ArrayRef];
- do_bench(c, "4096 string_dictionary(100, 0.5)", cols);
+ do_bench(c, "4096 string_dictionary(100, 0.5)", cols.clone(), true);
+ let name = "4096 string_dictionary_non_preserving(100, 0.5)";
+ do_bench(c, name, cols, false);
let cols = vec![
Arc::new(create_string_array_with_len::<i32>(4096, 0.5, 20)) as
ArrayRef,
@@ -104,6 +116,7 @@ fn row_bench(c: &mut Criterion) {
c,
"4096 string(20, 0.5), string(30, 0), string(100, 0), i64(0)",
cols,
+ false,
);
let cols = vec![
@@ -112,7 +125,7 @@ fn row_bench(c: &mut Criterion) {
Arc::new(create_string_dict_array::<Int32Type>(4096, 0., 100)) as
ArrayRef,
Arc::new(create_primitive_array::<Int64Type>(4096, 0.)) as ArrayRef,
];
- do_bench(c, "4096 4096 string_dictionary(20, 0.5), string_dictionary(30,
0), string_dictionary(100, 0), i64(0)", cols);
+ do_bench(c, "4096 4096 string_dictionary(20, 0.5), string_dictionary(30,
0), string_dictionary(100, 0), i64(0)", cols, false);
}
criterion_group!(benches, row_bench);