This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 69c04db96 Make dictionary preservation optional in row encoding (#3831)
69c04db96 is described below

commit 69c04db962b915b8a2d3783853da0ce95a94c0ef
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Mar 10 14:46:31 2023 +0100

    Make dictionary preservation optional in row encoding (#3831)
    
    * Make dictionary preservation optional in row encoding
    
    * Review feedback
---
 arrow-row/src/dictionary.rs |  20 ++++-
 arrow-row/src/lib.rs        | 208 +++++++++++++++++++++++++++++++++++++-------
 arrow/benches/row_format.rs |  39 ++++++---
 3 files changed, 223 insertions(+), 44 deletions(-)

diff --git a/arrow-row/src/dictionary.rs b/arrow-row/src/dictionary.rs
index e332e1131..bacc116ca 100644
--- a/arrow-row/src/dictionary.rs
+++ b/arrow-row/src/dictionary.rs
@@ -17,7 +17,7 @@
 
 use crate::fixed::{FixedLengthEncoding, FromSlice};
 use crate::interner::{Interned, OrderPreservingInterner};
-use crate::{null_sentinel, Rows};
+use crate::{null_sentinel, Row, Rows};
 use arrow_array::builder::*;
 use arrow_array::cast::*;
 use arrow_array::types::*;
@@ -56,6 +56,24 @@ pub fn compute_dictionary_mapping(
     }
 }
 
+/// Encode dictionary values not preserving the dictionary encoding
+pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
+    out: &mut Rows,
+    column: &DictionaryArray<K>,
+    values: &Rows,
+    null: &Row<'_>,
+) {
+    for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
+        let row = match k {
+            Some(k) => values.row(k.as_usize()).data,
+            None => null.data,
+        };
+        let end_offset = *offset + row.len();
+        out.buffer[*offset..end_offset].copy_from_slice(row);
+        *offset = end_offset;
+    }
+}
+
 /// Dictionary types are encoded as
 ///
 /// - single `0_u8` if null
diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index 2e489c974..e4b02fbf2 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -137,6 +137,7 @@ use arrow_schema::*;
 
 use crate::dictionary::{
     compute_dictionary_mapping, decode_dictionary, encode_dictionary,
+    encode_dictionary_values,
 };
 use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
 use crate::interner::OrderPreservingInterner;
@@ -426,7 +427,14 @@ enum Codec {
     /// No additional codec state is necessary
     Stateless,
     /// The interner used to encode dictionary values
+    ///
+    /// Used when preserving the dictionary encoding
     Dictionary(OrderPreservingInterner),
+    /// A row converter for the dictionary values
+    /// and the encoding of a row containing only nulls
+    ///
+    /// Used when not preserving dictionary encoding
+    DictionaryValues(RowConverter, OwnedRow),
     /// A row converter for the child fields
     /// and the encoding of a row containing only nulls
     Struct(RowConverter, OwnedRow),
@@ -437,7 +445,25 @@ enum Codec {
 impl Codec {
     fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
         match &sort_field.data_type {
-            DataType::Dictionary(_, _) => 
Ok(Self::Dictionary(Default::default())),
+            DataType::Dictionary(_, values) => match 
sort_field.preserve_dictionaries {
+                true => Ok(Self::Dictionary(Default::default())),
+                false => {
+                    let sort_field = SortField::new_with_options(
+                        values.as_ref().clone(),
+                        sort_field.options,
+                    );
+
+                    let mut converter = RowConverter::new(vec![sort_field])?;
+                    let null_array = new_null_array(values.as_ref(), 1);
+                    let nulls = converter.convert_columns(&[null_array])?;
+
+                    let owned = OwnedRow {
+                        data: nulls.buffer,
+                        config: nulls.config,
+                    };
+                    Ok(Self::DictionaryValues(converter, owned))
+                }
+            },
             d if !d.is_nested() => Ok(Self::Stateless),
             DataType::List(f) | DataType::LargeList(f) => {
                 // The encoded contents will be inverted if descending is set 
to true
@@ -501,6 +527,15 @@ impl Codec {
 
                 Ok(Encoder::Dictionary(mapping))
             }
+            Codec::DictionaryValues(converter, nulls) => {
+                let values = downcast_dictionary_array! {
+                    array => array.values(),
+                    _ => unreachable!()
+                };
+
+                let rows = converter.convert_columns(&[values.clone()])?;
+                Ok(Encoder::DictionaryValues(rows, nulls.row()))
+            }
             Codec::Struct(converter, null) => {
                 let v = as_struct_array(array);
                 let rows = converter.convert_columns(v.columns())?;
@@ -522,6 +557,9 @@ impl Codec {
         match self {
             Codec::Stateless => 0,
             Codec::Dictionary(interner) => interner.size(),
+            Codec::DictionaryValues(converter, nulls) => {
+                converter.size() + nulls.data.len()
+            }
             Codec::Struct(converter, nulls) => converter.size() + 
nulls.data.len(),
             Codec::List(converter) => converter.size(),
         }
@@ -534,6 +572,8 @@ enum Encoder<'a> {
     Stateless,
     /// The mapping from dictionary keys to normalized keys
     Dictionary(Vec<Option<&'a [u8]>>),
+    /// The encoding of the child array and the encoding of a null row
+    DictionaryValues(Rows, Row<'a>),
     /// The row encoding of the child arrays and the encoding of a null row
     ///
     /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
@@ -551,6 +591,8 @@ pub struct SortField {
     options: SortOptions,
     /// Data type
     data_type: DataType,
+    /// Preserve dictionaries
+    preserve_dictionaries: bool,
 }
 
 impl SortField {
@@ -561,7 +603,30 @@ impl SortField {
 
     /// Create a new column with the given data type and [`SortOptions`]
     pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self 
{
-        Self { options, data_type }
+        Self {
+            options,
+            data_type,
+            preserve_dictionaries: true,
+        }
+    }
+
+    /// By default dictionaries are preserved as described on [`RowConverter`]
+    ///
+    /// However, this process requires maintaining and incrementally updating
+    /// an order-preserving mapping of dictionary values. This is relatively 
expensive
+    /// computationally but reduces the size of the encoded rows, minimising 
memory
+    /// usage and potentially yielding faster comparisons.
+    ///
+    /// Some applications may wish to instead trade-off space efficiency, for 
improved
+    /// encoding performance, by instead encoding dictionary values directly
+    ///
+    /// When `preserve_dictionaries` is true, fields will instead be encoded 
as their
+    /// underlying value, reversing any dictionary encoding
+    pub fn preserve_dictionaries(self, preserve_dictionaries: bool) -> Self {
+        Self {
+            preserve_dictionaries,
+            ..self
+        }
     }
 
     /// Return size of this instance in bytes.
@@ -1045,6 +1110,19 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: 
&[Encoder], config: RowConfig) ->
                     _ => unreachable!(),
                 }
             }
+            Encoder::DictionaryValues(values, null) => {
+                downcast_dictionary_array! {
+                    array => {
+                        for (v, length) in 
array.keys().iter().zip(lengths.iter_mut()) {
+                            *length += match v {
+                                Some(k) => values.row(k.as_usize()).data.len(),
+                                None => null.data.len(),
+                            }
+                        }
+                    }
+                    _ => unreachable!(),
+                }
+            }
             Encoder::Struct(rows, null) => {
                 let array = as_struct_array(array);
                 lengths.iter_mut().enumerate().for_each(|(idx, length)| {
@@ -1143,6 +1221,12 @@ fn encode_column(
                 _ => unreachable!()
             }
         }
+        Encoder::DictionaryValues(values, nulls) => {
+            downcast_dictionary_array! {
+                column => encode_dictionary_values(out, column, values, nulls),
+                _ => unreachable!()
+            }
+        }
         Encoder::Struct(rows, null) => {
             let array = as_struct_array(column);
             let null_sentinel = null_sentinel(opts);
@@ -1221,6 +1305,10 @@ unsafe fn decode_column(
                 _ => unreachable!()
             }
         }
+        Codec::DictionaryValues(converter, _) => {
+            let cols = converter.convert_raw(rows, validate_utf8)?;
+            cols.into_iter().next().unwrap()
+        }
         Codec::Struct(converter, _) => {
             let (null_count, nulls) = fixed::decode_nulls(rows);
             rows.iter_mut().for_each(|row| *row = &row[1..]);
@@ -1557,8 +1645,25 @@ mod tests {
         assert_eq!(&cols[0], &col);
     }
 
+    /// If `exact` is false performs a logical comparison between a and 
dictionary-encoded b
+    fn dictionary_eq(exact: bool, a: &dyn Array, b: &dyn Array) {
+        match b.data_type() {
+            DataType::Dictionary(_, v) if !exact => {
+                assert_eq!(a.data_type(), v.as_ref());
+                let b = arrow_cast::cast(b, v).unwrap();
+                assert_eq!(a.data(), b.data())
+            }
+            _ => assert_eq!(a.data(), b.data()),
+        }
+    }
+
     #[test]
     fn test_string_dictionary() {
+        test_string_dictionary_impl(false);
+        test_string_dictionary_impl(true);
+    }
+
+    fn test_string_dictionary_impl(preserve: bool) {
         let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
             Some("foo"),
             Some("hello"),
@@ -1570,8 +1675,8 @@ mod tests {
             Some("hello"),
         ])) as ArrayRef;
 
-        let mut converter =
-            
RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
+        let field = 
SortField::new(a.data_type().clone()).preserve_dictionaries(preserve);
+        let mut converter = RowConverter::new(vec![field]).unwrap();
         let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
 
         assert!(rows_a.row(3) < rows_a.row(5));
@@ -1584,7 +1689,7 @@ mod tests {
         assert_eq!(rows_a.row(1), rows_a.row(7));
 
         let cols = converter.convert_rows(&rows_a).unwrap();
-        assert_eq!(&cols[0], &a);
+        dictionary_eq(preserve, &cols[0], &a);
 
         let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
             Some("hello"),
@@ -1598,7 +1703,7 @@ mod tests {
         assert!(rows_b.row(2) < rows_a.row(0));
 
         let cols = converter.convert_rows(&rows_b).unwrap();
-        assert_eq!(&cols[0], &b);
+        dictionary_eq(preserve, &cols[0], &b);
 
         let mut converter = RowConverter::new(vec![SortField::new_with_options(
             a.data_type().clone(),
@@ -1606,7 +1711,8 @@ mod tests {
                 descending: true,
                 nulls_first: false,
             },
-        )])
+        )
+        .preserve_dictionaries(preserve)])
         .unwrap();
 
         let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
@@ -1616,7 +1722,7 @@ mod tests {
         assert!(rows_c.row(3) > rows_c.row(0));
 
         let cols = converter.convert_rows(&rows_c).unwrap();
-        assert_eq!(&cols[0], &a);
+        dictionary_eq(preserve, &cols[0], &a);
 
         let mut converter = RowConverter::new(vec![SortField::new_with_options(
             a.data_type().clone(),
@@ -1624,7 +1730,8 @@ mod tests {
                 descending: true,
                 nulls_first: true,
             },
-        )])
+        )
+        .preserve_dictionaries(preserve)])
         .unwrap();
 
         let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
@@ -1634,7 +1741,7 @@ mod tests {
         assert!(rows_c.row(3) < rows_c.row(0));
 
         let cols = converter.convert_rows(&rows_c).unwrap();
-        assert_eq!(&cols[0], &a);
+        dictionary_eq(preserve, &cols[0], &a);
     }
 
     #[test]
@@ -1694,15 +1801,19 @@ mod tests {
         builder.append(-1).unwrap();
 
         let a = builder.finish();
-
-        let mut converter =
-            
RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
-        let rows = converter.convert_columns(&[Arc::new(a)]).unwrap();
-        assert!(rows.row(0) < rows.row(1));
-        assert!(rows.row(2) < rows.row(0));
-        assert!(rows.row(3) < rows.row(2));
-        assert!(rows.row(6) < rows.row(2));
-        assert!(rows.row(3) < rows.row(6));
+        let data_type = a.data_type().clone();
+        let columns = [Arc::new(a) as ArrayRef];
+
+        for preserve in [true, false] {
+            let field = 
SortField::new(data_type.clone()).preserve_dictionaries(preserve);
+            let mut converter = RowConverter::new(vec![field]).unwrap();
+            let rows = converter.convert_columns(&columns).unwrap();
+            assert!(rows.row(0) < rows.row(1));
+            assert!(rows.row(2) < rows.row(0));
+            assert!(rows.row(3) < rows.row(2));
+            assert!(rows.row(6) < rows.row(2));
+            assert!(rows.row(3) < rows.row(6));
+        }
     }
 
     #[test]
@@ -1722,15 +1833,17 @@ mod tests {
             .build()
             .unwrap();
 
-        let mut converter = 
RowConverter::new(vec![SortField::new(data_type)]).unwrap();
-        let rows = converter
-            
.convert_columns(&[Arc::new(DictionaryArray::<Int32Type>::from(data))])
-            .unwrap();
+        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as 
ArrayRef];
+        for preserve in [true, false] {
+            let field = 
SortField::new(data_type.clone()).preserve_dictionaries(preserve);
+            let mut converter = RowConverter::new(vec![field]).unwrap();
+            let rows = converter.convert_columns(&columns).unwrap();
 
-        assert_eq!(rows.row(0), rows.row(1));
-        assert_eq!(rows.row(3), rows.row(4));
-        assert_eq!(rows.row(4), rows.row(5));
-        assert!(rows.row(3) < rows.row(0));
+            assert_eq!(rows.row(0), rows.row(1));
+            assert_eq!(rows.row(3), rows.row(4));
+            assert_eq!(rows.row(4), rows.row(5));
+            assert!(rows.row(3) < rows.row(0));
+        }
     }
 
     #[test]
@@ -1974,6 +2087,35 @@ mod tests {
         test_nested_list::<i64>();
     }
 
+    #[test]
+    fn test_dictionary_preserving() {
+        let mut dict = StringDictionaryBuilder::<Int32Type>::new();
+        dict.append_value("foo");
+        dict.append_value("foo");
+        dict.append_value("bar");
+        dict.append_value("bar");
+        dict.append_value("bar");
+        dict.append_value("bar");
+
+        let array = Arc::new(dict.finish()) as ArrayRef;
+        let preserve = SortField::new(array.data_type().clone());
+        let non_preserve = preserve.clone().preserve_dictionaries(false);
+
+        let mut c1 = RowConverter::new(vec![preserve]).unwrap();
+        let r1 = c1.convert_columns(&[array.clone()]).unwrap();
+
+        let mut c2 = RowConverter::new(vec![non_preserve]).unwrap();
+        let r2 = c2.convert_columns(&[array.clone()]).unwrap();
+
+        for r in r1.iter() {
+            assert_eq!(r.data.len(), 3);
+        }
+
+        for r in r2.iter() {
+            assert_eq!(r.data.len(), 34);
+        }
+    }
+
     fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> 
PrimitiveArray<K>
     where
         K: ArrowPrimitiveType,
@@ -2129,12 +2271,18 @@ mod tests {
                 })
                 .collect();
 
+            let preserve: Vec<_> = (0..num_columns).map(|_| 
rng.gen_bool(0.5)).collect();
+
             let comparator = 
LexicographicalComparator::try_new(&sort_columns).unwrap();
 
             let columns = options
                 .into_iter()
                 .zip(&arrays)
-                .map(|(o, a)| 
SortField::new_with_options(a.data_type().clone(), o))
+                .zip(&preserve)
+                .map(|((o, a), p)| {
+                    SortField::new_with_options(a.data_type().clone(), o)
+                        .preserve_dictionaries(*p)
+                })
                 .collect();
 
             let mut converter = RowConverter::new(columns).unwrap();
@@ -2160,9 +2308,9 @@ mod tests {
             }
 
             let back = converter.convert_rows(&rows).unwrap();
-            for (actual, expected) in back.iter().zip(&arrays) {
+            for ((actual, expected), preserve) in 
back.iter().zip(&arrays).zip(preserve) {
                 actual.data().validate_full().unwrap();
-                assert_eq!(actual, expected)
+                dictionary_eq(preserve, actual, expected)
             }
         }
     }
diff --git a/arrow/benches/row_format.rs b/arrow/benches/row_format.rs
index 961cf07de..12ce71764 100644
--- a/arrow/benches/row_format.rs
+++ b/arrow/benches/row_format.rs
@@ -30,10 +30,18 @@ use arrow_array::Array;
 use criterion::{black_box, Criterion};
 use std::sync::Arc;
 
-fn do_bench(c: &mut Criterion, name: &str, cols: Vec<ArrayRef>) {
+fn do_bench(
+    c: &mut Criterion,
+    name: &str,
+    cols: Vec<ArrayRef>,
+    preserve_dictionaries: bool,
+) {
     let fields: Vec<_> = cols
         .iter()
-        .map(|x| SortField::new(x.data_type().clone()))
+        .map(|x| {
+            SortField::new(x.data_type().clone())
+                .preserve_dictionaries(preserve_dictionaries)
+        })
         .collect();
 
     c.bench_function(&format!("convert_columns {name}"), |b| {
@@ -57,42 +65,46 @@ fn do_bench(c: &mut Criterion, name: &str, cols: 
Vec<ArrayRef>) {
 
 fn row_bench(c: &mut Criterion) {
     let cols = vec![Arc::new(create_primitive_array::<UInt64Type>(4096, 0.)) 
as ArrayRef];
-    do_bench(c, "4096 u64(0)", cols);
+    do_bench(c, "4096 u64(0)", cols, true);
 
     let cols = vec![Arc::new(create_primitive_array::<Int64Type>(4096, 0.)) as 
ArrayRef];
-    do_bench(c, "4096 i64(0)", cols);
+    do_bench(c, "4096 i64(0)", cols, true);
 
     let cols =
         vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 10)) as 
ArrayRef];
-    do_bench(c, "4096 string(10, 0)", cols);
+    do_bench(c, "4096 string(10, 0)", cols, true);
 
     let cols =
         vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 30)) as 
ArrayRef];
-    do_bench(c, "4096 string(30, 0)", cols);
+    do_bench(c, "4096 string(30, 0)", cols, true);
 
     let cols =
         vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 100)) as 
ArrayRef];
-    do_bench(c, "4096 string(100, 0)", cols);
+    do_bench(c, "4096 string(100, 0)", cols, true);
 
     let cols =
         vec![Arc::new(create_string_array_with_len::<i32>(4096, 0.5, 100)) as 
ArrayRef];
-    do_bench(c, "4096 string(100, 0.5)", cols);
+    do_bench(c, "4096 string(100, 0.5)", cols, true);
 
     let cols =
         vec![Arc::new(create_string_dict_array::<Int32Type>(4096, 0., 10)) as 
ArrayRef];
-    do_bench(c, "4096 string_dictionary(10, 0)", cols);
+    do_bench(c, "4096 string_dictionary(10, 0)", cols, true);
 
     let cols =
         vec![Arc::new(create_string_dict_array::<Int32Type>(4096, 0., 30)) as 
ArrayRef];
-    do_bench(c, "4096 string_dictionary(30, 0)", cols);
+    do_bench(c, "4096 string_dictionary(30, 0)", cols, true);
 
     let cols =
         vec![Arc::new(create_string_dict_array::<Int32Type>(4096, 0., 100)) as 
ArrayRef];
-    do_bench(c, "4096 string_dictionary(100, 0)", cols);
+    do_bench(c, "4096 string_dictionary(100, 0)", cols.clone(), true);
+    let name = "4096 string_dictionary_non_preserving(100, 0)";
+    do_bench(c, name, cols, false);
 
     let cols =
         vec![Arc::new(create_string_dict_array::<Int32Type>(4096, 0.5, 100)) 
as ArrayRef];
-    do_bench(c, "4096 string_dictionary(100, 0.5)", cols);
+    do_bench(c, "4096 string_dictionary(100, 0.5)", cols.clone(), true);
+    let name = "4096 string_dictionary_non_preserving(100, 0.5)";
+    do_bench(c, name, cols, false);
 
     let cols = vec![
         Arc::new(create_string_array_with_len::<i32>(4096, 0.5, 20)) as 
ArrayRef,
@@ -104,6 +116,7 @@ fn row_bench(c: &mut Criterion) {
         c,
         "4096 string(20, 0.5), string(30, 0), string(100, 0), i64(0)",
         cols,
+        false,
     );
 
     let cols = vec![
@@ -112,7 +125,7 @@ fn row_bench(c: &mut Criterion) {
         Arc::new(create_string_dict_array::<Int32Type>(4096, 0., 100)) as 
ArrayRef,
         Arc::new(create_primitive_array::<Int64Type>(4096, 0.)) as ArrayRef,
     ];
-    do_bench(c, "4096 4096 string_dictionary(20, 0.5), string_dictionary(30, 
0), string_dictionary(100, 0), i64(0)", cols);
+    do_bench(c, "4096 4096 string_dictionary(20, 0.5), string_dictionary(30, 
0), string_dictionary(100, 0), i64(0)", cols, false);
 }
 
 criterion_group!(benches, row_bench);

Reply via email to