thinkharderdev commented on code in PR #5488:
URL: https://github.com/apache/arrow-rs/pull/5488#discussion_r1518627039


##########
arrow-flight/src/encode.rs:
##########
@@ -388,29 +388,39 @@ impl Stream for FlightDataEncoder {
 /// Defines how a [`FlightDataEncoder`] encodes [`DictionaryArray`]s
 ///
 /// [`DictionaryArray`]: arrow_array::DictionaryArray
+///
+/// In the arrow flight protocol dictionary values and keys are sent as two 
separate messages.
+/// When a sender is encoding a [`RecordBatch`] containing ['DictionaryArray'] 
columns, it will
+/// first send a dictionary batch (a batch with header 
`MessageHeader::DictionaryBatch`) containing
+/// the dictionary values. The receiver is responsible for reading this batch 
and maintaining state that associates
+/// those dictionary values with the corresponding array using the `dict_id` 
as a key.
+///
+/// After sending the dictionary batch the sender will send the array data in 
a batch with header `MessageHeader::RecordBatch`.
+/// For any dictionary array batches in this message, the encoded flight 
message will only contain the dictionary keys. The receiver
+/// is then responsible for rebuilding the `DictionaryArray` on the client 
side using the dictionary values from the DictionaryBatch message
+/// and the keys from the RecordBatch message.
+///
+/// For example, if we have a batch with a `TypedDictionaryArray<'_, 
UInt32Type, Utf8Type>` (a dictionary array where they keys are `u32` and the
+/// values are `String`), then the DictionaryBatch will contain a 
`StringArray` and the RecordBatch will contain a `UInt32Array`.
+///
+/// Not that since `dict_id` defined in the `Schema` is used as a key to 
assicated dictionary values to their arrays it is required that each
+/// `DictionaryArray` in a `RecordBatch` have a unique `dict_id`.
 #[derive(Debug, PartialEq)]
 pub enum DictionaryHandling {
-    /// Expands to the underlying type (default). This likely sends more data
-    /// over the network but requires less memory (dictionaries are not 
tracked)
-    /// and is more compatible with other arrow flight client implementations
-    /// that may not support `DictionaryEncoding`
-    ///
-    /// An IPC response, streaming or otherwise, defines its schema up front
-    /// which defines the mapping from dictionary IDs. It then sends these
-    /// dictionaries over the wire.
+    /// This method should be used if all batches over the entire lifetime fo 
the flight stream

Review Comment:
   Hmm, I think you're right but the current implementation is actually broken 
then because it doesn't take into account nested dictionary fields:
   
   ```
   fn prepare_schema_for_flight(schema: &Schema, send_dictionaries: bool) -> 
Schema {
       let fields: Fields = schema
           .fields()
           .iter()
           .map(|field| match field.data_type() {
               DataType::Dictionary(_, value_type) if !send_dictionaries => 
Field::new(
                   field.name(),
                   value_type.as_ref().clone(),
                   field.is_nullable(),
               )
               .with_metadata(field.metadata().clone()),
               _ => field.as_ref().clone(),
           })
           .collect();
   
       Schema::new(fields).with_metadata(schema.metadata().clone())
   }
   ```
   
   Then if you have `DictionaryHandling::Hydrate` (the default) it will break 
on dictionary replacement of the nested field:
   
   
   ```
       pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> 
Result<bool, ArrowError> {
           let dict_data = column.to_data();
           let dict_values = &dict_data.child_data()[0];
   
           // If a dictionary with this id was already emitted, check if it was 
the same.
           if let Some(last) = self.written.get(&dict_id) {
               if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
                   // Same dictionary values => no need to emit it again
                   return Ok(false);
               }
               if self.error_on_replacement {
                   // If error on replacement perform a logical comparison
                   if last.child_data()[0] == *dict_values {
                       // Same dictionary values => no need to emit it again
                       return Ok(false);
                   }
                   return Err(ArrowError::InvalidArgumentError(
                       "Dictionary replacement detected when writing IPC file 
format. \
                        Arrow IPC files only support a single dictionary for a 
given field \
                        across all batches."
                           .to_string(),
                   ));
               }
           }
   
           self.written.insert(dict_id, dict_data);
           Ok(true)
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to