This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d5a655d21f Add option to `FlightDataEncoder` to always resend batch
dictionaries (#4896)
d5a655d21f is described below
commit d5a655d21fe14e6c08e72ba3233909e414dfb6b6
Author: Alex Wilcoxson <[email protected]>
AuthorDate: Thu Oct 12 05:47:55 2023 -0500
Add option to `FlightDataEncoder` to always resend batch dictionaries
(#4896)
* Add option to FlightDataEncoder to always resend batch dictionaries
* Replace send_dictionaries on FlightDataEncoder with DictionaryHandling
Enum
* Improve docs
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-flight/src/encode.rs | 197 +++++++++++++++++++++++++++++++++++++--------
1 file changed, 163 insertions(+), 34 deletions(-)
diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index 28c181c0d5..9ae7f16379 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -36,9 +36,11 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// several have already been successfully produced.
///
/// # Caveats
-/// 1. [`DictionaryArray`](arrow_array::array::DictionaryArray)s
-/// are converted to their underlying types prior to transport, due to
-/// <https://github.com/apache/arrow-rs/issues/3389>.
+/// 1. When [`DictionaryHandling`] is [`DictionaryHandling::Hydrate`],
[`DictionaryArray`](arrow_array::array::DictionaryArray)s
+/// are converted to their underlying types prior to transport.
+/// When [`DictionaryHandling`] is [`DictionaryHandling::Resend`],
Dictionary [`FlightData`] is sent with every
+/// [`RecordBatch`] that contains a
[`DictionaryArray`](arrow_array::array::DictionaryArray).
+/// See <https://github.com/apache/arrow-rs/issues/3389>.
///
/// # Example
/// ```no_run
@@ -112,6 +114,9 @@ pub struct FlightDataEncoderBuilder {
schema: Option<SchemaRef>,
/// Optional flight descriptor, if known before data.
descriptor: Option<FlightDescriptor>,
+ /// Deterimines how `DictionaryArray`s are encoded for transport.
+ /// See [`DictionaryHandling`] for more information.
+ dictionary_handling: DictionaryHandling,
}
/// Default target size for encoded [`FlightData`].
@@ -128,6 +133,7 @@ impl Default for FlightDataEncoderBuilder {
app_metadata: Bytes::new(),
schema: None,
descriptor: None,
+ dictionary_handling: DictionaryHandling::Hydrate,
}
}
}
@@ -152,6 +158,15 @@ impl FlightDataEncoderBuilder {
self
}
+ /// Set [`DictionaryHandling`] for encoder
+ pub fn with_dictionary_handling(
+ mut self,
+ dictionary_handling: DictionaryHandling,
+ ) -> Self {
+ self.dictionary_handling = dictionary_handling;
+ self
+ }
+
/// Specify application specific metadata included in the
/// [`FlightData::app_metadata`] field of the the first Schema
/// message
@@ -198,6 +213,7 @@ impl FlightDataEncoderBuilder {
app_metadata,
schema,
descriptor,
+ dictionary_handling,
} = self;
FlightDataEncoder::new(
@@ -207,6 +223,7 @@ impl FlightDataEncoderBuilder {
options,
app_metadata,
descriptor,
+ dictionary_handling,
)
}
}
@@ -232,6 +249,9 @@ pub struct FlightDataEncoder {
done: bool,
/// cleared after the first FlightData message is sent
descriptor: Option<FlightDescriptor>,
+ /// Deterimines how `DictionaryArray`s are encoded for transport.
+ /// See [`DictionaryHandling`] for more information.
+ dictionary_handling: DictionaryHandling,
}
impl FlightDataEncoder {
@@ -242,16 +262,21 @@ impl FlightDataEncoder {
options: IpcWriteOptions,
app_metadata: Bytes,
descriptor: Option<FlightDescriptor>,
+ dictionary_handling: DictionaryHandling,
) -> Self {
let mut encoder = Self {
inner,
schema: None,
max_flight_data_size,
- encoder: FlightIpcEncoder::new(options),
+ encoder: FlightIpcEncoder::new(
+ options,
+ dictionary_handling != DictionaryHandling::Resend,
+ ),
app_metadata: Some(app_metadata),
queue: VecDeque::new(),
done: false,
descriptor,
+ dictionary_handling,
};
// If schema is known up front, enqueue it immediately
@@ -282,7 +307,8 @@ impl FlightDataEncoder {
fn encode_schema(&mut self, schema: &SchemaRef) -> SchemaRef {
// The first message is the schema message, and all
// batches have the same schema
- let schema = Arc::new(prepare_schema_for_flight(schema));
+ let send_dictionaries = self.dictionary_handling ==
DictionaryHandling::Resend;
+ let schema = Arc::new(prepare_schema_for_flight(schema,
send_dictionaries));
let mut schema_flight_data = self.encoder.encode_schema(&schema);
// attach any metadata requested
@@ -304,7 +330,8 @@ impl FlightDataEncoder {
};
// encode the batch
- let batch = prepare_batch_for_flight(&batch, schema)?;
+ let send_dictionaries = self.dictionary_handling ==
DictionaryHandling::Resend;
+ let batch = prepare_batch_for_flight(&batch, schema,
send_dictionaries)?;
for batch in split_batch_for_grpc_response(batch,
self.max_flight_data_size) {
let (flight_dictionaries, flight_batch) =
@@ -365,17 +392,46 @@ impl Stream for FlightDataEncoder {
}
}
+/// Defines how a [`FlightDataEncoder`] encodes [`DictionaryArray`]s
+///
+/// [`DictionaryArray`]: arrow_array::DictionaryArray
+#[derive(Debug, PartialEq)]
+pub enum DictionaryHandling {
+ /// Expands to the underlying type (default). This likely sends more data
+ /// over the network but requires less memory (dictionaries are not
tracked)
+ /// and is more compatible with other arrow flight client implementations
+ /// that may not support `DictionaryEncoding`
+ ///
+ /// An IPC response, streaming or otherwise, defines its schema up front
+ /// which defines the mapping from dictionary IDs. It then sends these
+ /// dictionaries over the wire.
+ ///
+ /// This requires identifying the different dictionaries in use, assigning
+ /// them IDs, and sending new dictionaries, delta or otherwise, when needed
+ ///
+ /// See also:
+ /// * <https://github.com/apache/arrow-rs/issues/1206>
+ Hydrate,
+ /// Send dictionary FlightData with every RecordBatch that contains a
+ /// [`DictionaryArray`]. See [`Self::Hydrate`] for more tradeoffs. No
+ /// attempt is made to skip sending the same (logical) dictionary values
+ /// twice.
+ ///
+ /// [`DictionaryArray`]: arrow_array::DictionaryArray
+ Resend,
+}
+
/// Prepare an arrow Schema for transport over the Arrow Flight protocol
///
/// Convert dictionary types to underlying types
///
/// See hydrate_dictionary for more information
-fn prepare_schema_for_flight(schema: &Schema) -> Schema {
+fn prepare_schema_for_flight(schema: &Schema, send_dictionaries: bool) ->
Schema {
let fields: Fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
- DataType::Dictionary(_, value_type) => Field::new(
+ DataType::Dictionary(_, value_type) if !send_dictionaries =>
Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
@@ -434,8 +490,7 @@ struct FlightIpcEncoder {
}
impl FlightIpcEncoder {
- fn new(options: IpcWriteOptions) -> Self {
- let error_on_replacement = true;
+ fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self {
Self {
options,
data_gen: IpcDataGenerator::default(),
@@ -478,12 +533,14 @@ impl FlightIpcEncoder {
fn prepare_batch_for_flight(
batch: &RecordBatch,
schema: SchemaRef,
+ send_dictionaries: bool,
) -> Result<RecordBatch> {
let columns = batch
.columns()
.iter()
- .map(hydrate_dictionary)
+ .map(|c| hydrate_dictionary(c, send_dictionaries))
.collect::<Result<Vec<_>>>()?;
+
let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
Ok(RecordBatch::try_new_with_options(
@@ -491,35 +548,28 @@ fn prepare_batch_for_flight(
)?)
}
-/// Hydrates a dictionary to its underlying type
-///
-/// An IPC response, streaming or otherwise, defines its schema up front
-/// which defines the mapping from dictionary IDs. It then sends these
-/// dictionaries over the wire.
-///
-/// This requires identifying the different dictionaries in use, assigning
-/// them IDs, and sending new dictionaries, delta or otherwise, when needed
-///
-/// See also:
-/// * <https://github.com/apache/arrow-rs/issues/1206>
-///
-/// For now we just hydrate the dictionaries to their underlying type
-fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> {
- let arr = if let DataType::Dictionary(_, value) = array.data_type() {
- arrow_cast::cast(array, value)?
- } else {
- Arc::clone(array)
+/// Hydrates a dictionary to its underlying type if send_dictionaries is
false. If send_dictionaries
+/// is true, dictionaries are sent with every batch which is not as optimal as
described in [DictionaryHandling::Hydrate] above,
+/// but does enable sending DictionaryArray's via Flight.
+fn hydrate_dictionary(array: &ArrayRef, send_dictionaries: bool) ->
Result<ArrayRef> {
+ let arr = match array.data_type() {
+ DataType::Dictionary(_, value) if !send_dictionaries => {
+ arrow_cast::cast(array, value)?
+ }
+ _ => Arc::clone(array),
};
Ok(arr)
}
#[cfg(test)]
mod tests {
- use arrow_array::types::*;
use arrow_array::*;
+ use arrow_array::{cast::downcast_array, types::*};
use arrow_cast::pretty::pretty_format_batches;
use std::collections::HashMap;
+ use crate::decode::{DecodedPayload, FlightDataDecoder};
+
use super::*;
#[test]
@@ -537,7 +587,7 @@ mod tests {
let big_batch = batch.slice(0, batch.num_rows() - 1);
let optimized_big_batch =
- prepare_batch_for_flight(&big_batch, Arc::clone(&schema))
+ prepare_batch_for_flight(&big_batch, Arc::clone(&schema), false)
.expect("failed to optimize");
let (_, optimized_big_flight_batch) =
make_flight_data(&optimized_big_batch, &options);
@@ -549,7 +599,7 @@ mod tests {
let small_batch = batch.slice(0, 1);
let optimized_small_batch =
- prepare_batch_for_flight(&small_batch, Arc::clone(&schema))
+ prepare_batch_for_flight(&small_batch, Arc::clone(&schema), false)
.expect("failed to optimize");
let (_, optimized_small_flight_batch) =
make_flight_data(&optimized_small_batch, &options);
@@ -560,6 +610,84 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn test_dictionary_hydration() {
+ let arr: DictionaryArray<UInt16Type> = vec!["a", "a",
"b"].into_iter().collect();
+ let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
+ "dict",
+ DataType::UInt16,
+ DataType::Utf8,
+ false,
+ )]));
+ let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
+ let encoder = FlightDataEncoderBuilder::default()
+ .build(futures::stream::once(async { Ok(batch) }));
+ let mut decoder = FlightDataDecoder::new(encoder);
+ let expected_schema =
+ Schema::new(vec![Field::new("dict", DataType::Utf8, false)]);
+ let expected_schema = Arc::new(expected_schema);
+ while let Some(decoded) = decoder.next().await {
+ let decoded = decoded.unwrap();
+ match decoded.payload {
+ DecodedPayload::None => {}
+ DecodedPayload::Schema(s) => assert_eq!(s, expected_schema),
+ DecodedPayload::RecordBatch(b) => {
+ assert_eq!(b.schema(), expected_schema);
+ let expected_array = StringArray::from(vec!["a", "a",
"b"]);
+ let actual_array = b.column_by_name("dict").unwrap();
+ let actual_array =
downcast_array::<StringArray>(actual_array);
+
+ assert_eq!(actual_array, expected_array);
+ }
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_send_dictionaries() {
+ let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
+ "dict",
+ DataType::UInt16,
+ DataType::Utf8,
+ false,
+ )]));
+
+ let arr_one: Arc<DictionaryArray<UInt16Type>> =
+ Arc::new(vec!["a", "a", "b"].into_iter().collect());
+ let arr_two: Arc<DictionaryArray<UInt16Type>> =
+ Arc::new(vec!["b", "a", "c"].into_iter().collect());
+ let batch_one =
+ RecordBatch::try_new(schema.clone(),
vec![arr_one.clone()]).unwrap();
+ let batch_two =
+ RecordBatch::try_new(schema.clone(),
vec![arr_two.clone()]).unwrap();
+
+ let encoder = FlightDataEncoderBuilder::default()
+ .with_dictionary_handling(DictionaryHandling::Resend)
+ .build(futures::stream::iter(vec![Ok(batch_one), Ok(batch_two)]));
+
+ let mut decoder = FlightDataDecoder::new(encoder);
+ let mut expected_array = arr_one;
+ while let Some(decoded) = decoder.next().await {
+ let decoded = decoded.unwrap();
+ match decoded.payload {
+ DecodedPayload::None => {}
+ DecodedPayload::Schema(s) => assert_eq!(s, schema),
+ DecodedPayload::RecordBatch(b) => {
+ assert_eq!(b.schema(), schema);
+
+ let actual_array =
+ Arc::new(downcast_array::<DictionaryArray<UInt16Type>>(
+ b.column_by_name("dict").unwrap(),
+ ));
+
+ assert_eq!(actual_array, expected_array);
+
+ expected_array = arr_two.clone();
+ }
+ }
+ }
+ }
+
#[test]
fn test_schema_metadata_encoded() {
let schema =
@@ -567,7 +695,7 @@ mod tests {
HashMap::from([("some_key".to_owned(),
"some_value".to_owned())]),
);
- let got = prepare_schema_for_flight(&schema);
+ let got = prepare_schema_for_flight(&schema, false);
assert!(got.metadata().contains_key("some_key"));
}
@@ -580,7 +708,8 @@ mod tests {
)
.expect("cannot create record batch");
- prepare_batch_for_flight(&batch, batch.schema()).expect("failed to
optimize");
+ prepare_batch_for_flight(&batch, batch.schema(), false)
+ .expect("failed to optimize");
}
pub fn make_flight_data(