This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d5a655d21f Add option to `FlightDataEncoder` to always resend batch 
dictionaries (#4896)
d5a655d21f is described below

commit d5a655d21fe14e6c08e72ba3233909e414dfb6b6
Author: Alex Wilcoxson <[email protected]>
AuthorDate: Thu Oct 12 05:47:55 2023 -0500

    Add option to `FlightDataEncoder` to always resend batch dictionaries 
(#4896)
    
    * Add option to FlightDataEncoder to always resend batch dictionaries
    
    * Replace send_dictionaries on FlightDataEncoder with DictionaryHandling 
Enum
    
    * Improve docs
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-flight/src/encode.rs | 197 +++++++++++++++++++++++++++++++++++++--------
 1 file changed, 163 insertions(+), 34 deletions(-)

diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index 28c181c0d5..9ae7f16379 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -36,9 +36,11 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
 /// several have already been successfully produced.
 ///
 /// # Caveats
-///   1. [`DictionaryArray`](arrow_array::array::DictionaryArray)s
-///   are converted to their underlying types prior to transport, due to
-///   <https://github.com/apache/arrow-rs/issues/3389>.
+///   1. When [`DictionaryHandling`] is [`DictionaryHandling::Hydrate`], 
[`DictionaryArray`](arrow_array::array::DictionaryArray)s
+///   are converted to their underlying types prior to transport.
+///   When [`DictionaryHandling`] is [`DictionaryHandling::Resend`], 
Dictionary [`FlightData`] is sent with every
+///   [`RecordBatch`] that contains a 
[`DictionaryArray`](arrow_array::array::DictionaryArray).
+///   See <https://github.com/apache/arrow-rs/issues/3389>.
 ///
 /// # Example
 /// ```no_run
@@ -112,6 +114,9 @@ pub struct FlightDataEncoderBuilder {
     schema: Option<SchemaRef>,
     /// Optional flight descriptor, if known before data.
     descriptor: Option<FlightDescriptor>,
+    /// Deterimines how `DictionaryArray`s are encoded for transport.
+    /// See [`DictionaryHandling`] for more information.
+    dictionary_handling: DictionaryHandling,
 }
 
 /// Default target size for encoded [`FlightData`].
@@ -128,6 +133,7 @@ impl Default for FlightDataEncoderBuilder {
             app_metadata: Bytes::new(),
             schema: None,
             descriptor: None,
+            dictionary_handling: DictionaryHandling::Hydrate,
         }
     }
 }
@@ -152,6 +158,15 @@ impl FlightDataEncoderBuilder {
         self
     }
 
+    /// Set [`DictionaryHandling`] for encoder
+    pub fn with_dictionary_handling(
+        mut self,
+        dictionary_handling: DictionaryHandling,
+    ) -> Self {
+        self.dictionary_handling = dictionary_handling;
+        self
+    }
+
     /// Specify application specific metadata included in the
     /// [`FlightData::app_metadata`] field of the the first Schema
     /// message
@@ -198,6 +213,7 @@ impl FlightDataEncoderBuilder {
             app_metadata,
             schema,
             descriptor,
+            dictionary_handling,
         } = self;
 
         FlightDataEncoder::new(
@@ -207,6 +223,7 @@ impl FlightDataEncoderBuilder {
             options,
             app_metadata,
             descriptor,
+            dictionary_handling,
         )
     }
 }
@@ -232,6 +249,9 @@ pub struct FlightDataEncoder {
     done: bool,
     /// cleared after the first FlightData message is sent
     descriptor: Option<FlightDescriptor>,
+    /// Deterimines how `DictionaryArray`s are encoded for transport.
+    /// See [`DictionaryHandling`] for more information.
+    dictionary_handling: DictionaryHandling,
 }
 
 impl FlightDataEncoder {
@@ -242,16 +262,21 @@ impl FlightDataEncoder {
         options: IpcWriteOptions,
         app_metadata: Bytes,
         descriptor: Option<FlightDescriptor>,
+        dictionary_handling: DictionaryHandling,
     ) -> Self {
         let mut encoder = Self {
             inner,
             schema: None,
             max_flight_data_size,
-            encoder: FlightIpcEncoder::new(options),
+            encoder: FlightIpcEncoder::new(
+                options,
+                dictionary_handling != DictionaryHandling::Resend,
+            ),
             app_metadata: Some(app_metadata),
             queue: VecDeque::new(),
             done: false,
             descriptor,
+            dictionary_handling,
         };
 
         // If schema is known up front, enqueue it immediately
@@ -282,7 +307,8 @@ impl FlightDataEncoder {
     fn encode_schema(&mut self, schema: &SchemaRef) -> SchemaRef {
         // The first message is the schema message, and all
         // batches have the same schema
-        let schema = Arc::new(prepare_schema_for_flight(schema));
+        let send_dictionaries = self.dictionary_handling == 
DictionaryHandling::Resend;
+        let schema = Arc::new(prepare_schema_for_flight(schema, 
send_dictionaries));
         let mut schema_flight_data = self.encoder.encode_schema(&schema);
 
         // attach any metadata requested
@@ -304,7 +330,8 @@ impl FlightDataEncoder {
         };
 
         // encode the batch
-        let batch = prepare_batch_for_flight(&batch, schema)?;
+        let send_dictionaries = self.dictionary_handling == 
DictionaryHandling::Resend;
+        let batch = prepare_batch_for_flight(&batch, schema, 
send_dictionaries)?;
 
         for batch in split_batch_for_grpc_response(batch, 
self.max_flight_data_size) {
             let (flight_dictionaries, flight_batch) =
@@ -365,17 +392,46 @@ impl Stream for FlightDataEncoder {
     }
 }
 
+/// Defines how a [`FlightDataEncoder`] encodes [`DictionaryArray`]s
+///
+/// [`DictionaryArray`]: arrow_array::DictionaryArray
+#[derive(Debug, PartialEq)]
+pub enum DictionaryHandling {
+    /// Expands to the underlying type (default). This likely sends more data
+    /// over the network but requires less memory (dictionaries are not 
tracked)
+    /// and is more compatible with other arrow flight client implementations
+    /// that may not support `DictionaryEncoding`
+    ///
+    /// An IPC response, streaming or otherwise, defines its schema up front
+    /// which defines the mapping from dictionary IDs. It then sends these
+    /// dictionaries over the wire.
+    ///
+    /// This requires identifying the different dictionaries in use, assigning
+    /// them IDs, and sending new dictionaries, delta or otherwise, when needed
+    ///
+    /// See also:
+    /// * <https://github.com/apache/arrow-rs/issues/1206>
+    Hydrate,
+    /// Send dictionary FlightData with every RecordBatch that contains a
+    /// [`DictionaryArray`]. See [`Self::Hydrate`] for more tradeoffs. No
+    /// attempt is made to skip sending the same (logical) dictionary values
+    /// twice.
+    ///
+    /// [`DictionaryArray`]: arrow_array::DictionaryArray
+    Resend,
+}
+
 /// Prepare an arrow Schema for transport over the Arrow Flight protocol
 ///
 /// Convert dictionary types to underlying types
 ///
 /// See hydrate_dictionary for more information
-fn prepare_schema_for_flight(schema: &Schema) -> Schema {
+fn prepare_schema_for_flight(schema: &Schema, send_dictionaries: bool) -> 
Schema {
     let fields: Fields = schema
         .fields()
         .iter()
         .map(|field| match field.data_type() {
-            DataType::Dictionary(_, value_type) => Field::new(
+            DataType::Dictionary(_, value_type) if !send_dictionaries => 
Field::new(
                 field.name(),
                 value_type.as_ref().clone(),
                 field.is_nullable(),
@@ -434,8 +490,7 @@ struct FlightIpcEncoder {
 }
 
 impl FlightIpcEncoder {
-    fn new(options: IpcWriteOptions) -> Self {
-        let error_on_replacement = true;
+    fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self {
         Self {
             options,
             data_gen: IpcDataGenerator::default(),
@@ -478,12 +533,14 @@ impl FlightIpcEncoder {
 fn prepare_batch_for_flight(
     batch: &RecordBatch,
     schema: SchemaRef,
+    send_dictionaries: bool,
 ) -> Result<RecordBatch> {
     let columns = batch
         .columns()
         .iter()
-        .map(hydrate_dictionary)
+        .map(|c| hydrate_dictionary(c, send_dictionaries))
         .collect::<Result<Vec<_>>>()?;
+
     let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
 
     Ok(RecordBatch::try_new_with_options(
@@ -491,35 +548,28 @@ fn prepare_batch_for_flight(
     )?)
 }
 
-/// Hydrates a dictionary to its underlying type
-///
-/// An IPC response, streaming or otherwise, defines its schema up front
-/// which defines the mapping from dictionary IDs. It then sends these
-/// dictionaries over the wire.
-///
-/// This requires identifying the different dictionaries in use, assigning
-/// them IDs, and sending new dictionaries, delta or otherwise, when needed
-///
-/// See also:
-/// * <https://github.com/apache/arrow-rs/issues/1206>
-///
-/// For now we just hydrate the dictionaries to their underlying type
-fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> {
-    let arr = if let DataType::Dictionary(_, value) = array.data_type() {
-        arrow_cast::cast(array, value)?
-    } else {
-        Arc::clone(array)
+/// Hydrates a dictionary to its underlying type if send_dictionaries is 
false. If send_dictionaries
+/// is true, dictionaries are sent with every batch which is not as optimal as 
described in [DictionaryHandling::Hydrate] above,
+/// but does enable sending DictionaryArray's via Flight.
+fn hydrate_dictionary(array: &ArrayRef, send_dictionaries: bool) -> 
Result<ArrayRef> {
+    let arr = match array.data_type() {
+        DataType::Dictionary(_, value) if !send_dictionaries => {
+            arrow_cast::cast(array, value)?
+        }
+        _ => Arc::clone(array),
     };
     Ok(arr)
 }
 
 #[cfg(test)]
 mod tests {
-    use arrow_array::types::*;
     use arrow_array::*;
+    use arrow_array::{cast::downcast_array, types::*};
     use arrow_cast::pretty::pretty_format_batches;
     use std::collections::HashMap;
 
+    use crate::decode::{DecodedPayload, FlightDataDecoder};
+
     use super::*;
 
     #[test]
@@ -537,7 +587,7 @@ mod tests {
 
         let big_batch = batch.slice(0, batch.num_rows() - 1);
         let optimized_big_batch =
-            prepare_batch_for_flight(&big_batch, Arc::clone(&schema))
+            prepare_batch_for_flight(&big_batch, Arc::clone(&schema), false)
                 .expect("failed to optimize");
         let (_, optimized_big_flight_batch) =
             make_flight_data(&optimized_big_batch, &options);
@@ -549,7 +599,7 @@ mod tests {
 
         let small_batch = batch.slice(0, 1);
         let optimized_small_batch =
-            prepare_batch_for_flight(&small_batch, Arc::clone(&schema))
+            prepare_batch_for_flight(&small_batch, Arc::clone(&schema), false)
                 .expect("failed to optimize");
         let (_, optimized_small_flight_batch) =
             make_flight_data(&optimized_small_batch, &options);
@@ -560,6 +610,84 @@ mod tests {
         );
     }
 
+    #[tokio::test]
+    async fn test_dictionary_hydration() {
+        let arr: DictionaryArray<UInt16Type> = vec!["a", "a", 
"b"].into_iter().collect();
+        let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
+            "dict",
+            DataType::UInt16,
+            DataType::Utf8,
+            false,
+        )]));
+        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
+        let encoder = FlightDataEncoderBuilder::default()
+            .build(futures::stream::once(async { Ok(batch) }));
+        let mut decoder = FlightDataDecoder::new(encoder);
+        let expected_schema =
+            Schema::new(vec![Field::new("dict", DataType::Utf8, false)]);
+        let expected_schema = Arc::new(expected_schema);
+        while let Some(decoded) = decoder.next().await {
+            let decoded = decoded.unwrap();
+            match decoded.payload {
+                DecodedPayload::None => {}
+                DecodedPayload::Schema(s) => assert_eq!(s, expected_schema),
+                DecodedPayload::RecordBatch(b) => {
+                    assert_eq!(b.schema(), expected_schema);
+                    let expected_array = StringArray::from(vec!["a", "a", 
"b"]);
+                    let actual_array = b.column_by_name("dict").unwrap();
+                    let actual_array = 
downcast_array::<StringArray>(actual_array);
+
+                    assert_eq!(actual_array, expected_array);
+                }
+            }
+        }
+    }
+
+    #[tokio::test]
+    async fn test_send_dictionaries() {
+        let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
+            "dict",
+            DataType::UInt16,
+            DataType::Utf8,
+            false,
+        )]));
+
+        let arr_one: Arc<DictionaryArray<UInt16Type>> =
+            Arc::new(vec!["a", "a", "b"].into_iter().collect());
+        let arr_two: Arc<DictionaryArray<UInt16Type>> =
+            Arc::new(vec!["b", "a", "c"].into_iter().collect());
+        let batch_one =
+            RecordBatch::try_new(schema.clone(), 
vec![arr_one.clone()]).unwrap();
+        let batch_two =
+            RecordBatch::try_new(schema.clone(), 
vec![arr_two.clone()]).unwrap();
+
+        let encoder = FlightDataEncoderBuilder::default()
+            .with_dictionary_handling(DictionaryHandling::Resend)
+            .build(futures::stream::iter(vec![Ok(batch_one), Ok(batch_two)]));
+
+        let mut decoder = FlightDataDecoder::new(encoder);
+        let mut expected_array = arr_one;
+        while let Some(decoded) = decoder.next().await {
+            let decoded = decoded.unwrap();
+            match decoded.payload {
+                DecodedPayload::None => {}
+                DecodedPayload::Schema(s) => assert_eq!(s, schema),
+                DecodedPayload::RecordBatch(b) => {
+                    assert_eq!(b.schema(), schema);
+
+                    let actual_array =
+                        Arc::new(downcast_array::<DictionaryArray<UInt16Type>>(
+                            b.column_by_name("dict").unwrap(),
+                        ));
+
+                    assert_eq!(actual_array, expected_array);
+
+                    expected_array = arr_two.clone();
+                }
+            }
+        }
+    }
+
     #[test]
     fn test_schema_metadata_encoded() {
         let schema =
@@ -567,7 +695,7 @@ mod tests {
                 HashMap::from([("some_key".to_owned(), 
"some_value".to_owned())]),
             );
 
-        let got = prepare_schema_for_flight(&schema);
+        let got = prepare_schema_for_flight(&schema, false);
         assert!(got.metadata().contains_key("some_key"));
     }
 
@@ -580,7 +708,8 @@ mod tests {
         )
         .expect("cannot create record batch");
 
-        prepare_batch_for_flight(&batch, batch.schema()).expect("failed to 
optimize");
+        prepare_batch_for_flight(&batch, batch.schema(), false)
+            .expect("failed to optimize");
     }
 
     pub fn make_flight_data(

Reply via email to