This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 14bd53dc124 Support dictionary encoding in structures for
`FlightDataEncoder`, add documentation for `arrow_flight::encode::Dictionary`
(#5488)
14bd53dc124 is described below
commit 14bd53dc1240003f171c8655863eae188cd0880f
Author: Dan Harris <[email protected]>
AuthorDate: Thu Mar 14 22:47:48 2024 -0400
Support dictionary encoding in structures for `FlightDataEncoder`, add
documentation for `arrow_flight::encode::Dictionary` (#5488)
* Add more detailed documentation for
arrow_flight::encode::DicationaryHandling
* fix doc link
* Fix handling of nested dictionary arrays with DictionaryHandling::Hydrate
* clippy
* Handle large list and sparse unions
* use top-level fields
* PR comments
---
arrow-flight/src/encode.rs | 478 +++++++++++++++++++++++++++++++++++++++++----
1 file changed, 435 insertions(+), 43 deletions(-)
diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index bb043681620..efd68812948 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -18,9 +18,11 @@
use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll};
use crate::{error::Result, FlightData, FlightDescriptor, SchemaAsIpc};
-use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
+
+use arrow_array::{Array, ArrayRef, RecordBatch, RecordBatchOptions,
UnionArray};
use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
-use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
+
+use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef,
UnionMode};
use bytes::Bytes;
use futures::{ready, stream::BoxStream, Stream, StreamExt};
@@ -323,9 +325,10 @@ impl FlightDataEncoder {
None => self.encode_schema(batch.schema_ref()),
};
- // encode the batch
- let send_dictionaries = self.dictionary_handling ==
DictionaryHandling::Resend;
- let batch = prepare_batch_for_flight(&batch, schema,
send_dictionaries)?;
+ let batch = match self.dictionary_handling {
+ DictionaryHandling::Resend => batch,
+ DictionaryHandling::Hydrate => hydrate_dictionaries(&batch,
schema)?,
+ };
for batch in split_batch_for_grpc_response(batch,
self.max_flight_data_size) {
let (flight_dictionaries, flight_batch) =
self.encoder.encode_batch(&batch)?;
@@ -388,6 +391,31 @@ impl Stream for FlightDataEncoder {
/// Defines how a [`FlightDataEncoder`] encodes [`DictionaryArray`]s
///
/// [`DictionaryArray`]: arrow_array::DictionaryArray
+///
+/// In the arrow flight protocol dictionary values and keys are sent as two
separate messages.
+/// When a sender is encoding a [`RecordBatch`] containing ['DictionaryArray']
columns, it will
+/// first send a dictionary batch (a batch with header
`MessageHeader::DictionaryBatch`) containing
+/// the dictionary values. The receiver is responsible for reading this batch
and maintaining state that associates
+/// those dictionary values with the corresponding array using the `dict_id`
as a key.
+///
+/// After sending the dictionary batch the sender will send the array data in
a batch with header `MessageHeader::RecordBatch`.
+/// For any dictionary array batches in this message, the encoded flight
message will only contain the dictionary keys. The receiver
+/// is then responsible for rebuilding the `DictionaryArray` on the client
side using the dictionary values from the DictionaryBatch message
+/// and the keys from the RecordBatch message.
+///
+/// For example, if we have a batch with a `TypedDictionaryArray<'_,
UInt32Type, Utf8Type>` (a dictionary array where they keys are `u32` and the
+/// values are `String`), then the DictionaryBatch will contain a
`StringArray` and the RecordBatch will contain a `UInt32Array`.
+///
+/// Note that since `dict_id` defined in the `Schema` is used as a key to
associate dictionary values to their arrays it is required that each
+/// `DictionaryArray` in a `RecordBatch` have a unique `dict_id`.
+///
+/// The current implementation does not support "delta" dictionaries so a new
dictionary batch will be sent each time the encoder sees a
+/// dictionary which is not pointer-equal to the previously observed
dictionary for a given `dict_id`.
+///
+/// For clients which may not support `DictionaryEncoding`, the
`DictionaryHandling::Hydrate` method will bypass the process defined above
+/// and "hydrate" any `DictionaryArray` in the batch to their underlying value
type (e.g. `TypedDictionaryArray<'_, UInt32Type, Utf8Type>` will
+/// be sent as a `StringArray`). With this method all data will be sent in
``MessageHeader::RecordBatch` messages and the batch schema
+/// will be adjusted so that all dictionary encoded fields are changed to
fields of the dictionary value type.
#[derive(Debug, PartialEq)]
pub enum DictionaryHandling {
/// Expands to the underlying type (default). This likely sends more data
@@ -395,13 +423,6 @@ pub enum DictionaryHandling {
/// and is more compatible with other arrow flight client implementations
/// that may not support `DictionaryEncoding`
///
- /// An IPC response, streaming or otherwise, defines its schema up front
- /// which defines the mapping from dictionary IDs. It then sends these
- /// dictionaries over the wire.
- ///
- /// This requires identifying the different dictionaries in use, assigning
- /// them IDs, and sending new dictionaries, delta or otherwise, when needed
- ///
/// See also:
/// * <https://github.com/apache/arrow-rs/issues/1206>
Hydrate,
@@ -411,9 +432,52 @@ pub enum DictionaryHandling {
/// twice.
///
/// [`DictionaryArray`]: arrow_array::DictionaryArray
+ ///
+ /// This requires identifying the different dictionaries in use and
assigning
+ // them unique IDs
Resend,
}
+fn prepare_field_for_flight(field: &FieldRef, send_dictionaries: bool) ->
Field {
+ match field.data_type() {
+ DataType::List(inner) => Field::new_list(
+ field.name(),
+ prepare_field_for_flight(inner, send_dictionaries),
+ field.is_nullable(),
+ )
+ .with_metadata(field.metadata().clone()),
+ DataType::LargeList(inner) => Field::new_list(
+ field.name(),
+ prepare_field_for_flight(inner, send_dictionaries),
+ field.is_nullable(),
+ )
+ .with_metadata(field.metadata().clone()),
+ DataType::Struct(fields) => {
+ let new_fields: Vec<Field> = fields
+ .iter()
+ .map(|f| prepare_field_for_flight(f, send_dictionaries))
+ .collect();
+ Field::new_struct(field.name(), new_fields, field.is_nullable())
+ .with_metadata(field.metadata().clone())
+ }
+ DataType::Union(fields, mode) => {
+ let (type_ids, new_fields): (Vec<i8>, Vec<Field>) = fields
+ .iter()
+ .map(|(type_id, f)| (type_id, prepare_field_for_flight(f,
send_dictionaries)))
+ .unzip();
+
+ Field::new_union(field.name(), type_ids, new_fields, *mode)
+ }
+ DataType::Dictionary(_, value_type) if !send_dictionaries =>
Field::new(
+ field.name(),
+ value_type.as_ref().clone(),
+ field.is_nullable(),
+ )
+ .with_metadata(field.metadata().clone()),
+ _ => field.as_ref().clone(),
+ }
+}
+
/// Prepare an arrow Schema for transport over the Arrow Flight protocol
///
/// Convert dictionary types to underlying types
@@ -430,6 +494,7 @@ fn prepare_schema_for_flight(schema: &Schema,
send_dictionaries: bool) -> Schema
field.is_nullable(),
)
.with_metadata(field.metadata().clone()),
+ tpe if tpe.is_nested() => prepare_field_for_flight(field,
send_dictionaries),
_ => field.as_ref().clone(),
})
.collect();
@@ -509,22 +574,14 @@ impl FlightIpcEncoder {
}
}
-/// Prepares a RecordBatch for transport over the Arrow Flight protocol
-///
-/// This means:
-///
-/// 1. Hydrates any dictionaries to its underlying type. See
+/// Hydrates any dictionaries arrays in `batch` to its underlying type. See
/// hydrate_dictionary for more information.
-///
-fn prepare_batch_for_flight(
- batch: &RecordBatch,
- schema: SchemaRef,
- send_dictionaries: bool,
-) -> Result<RecordBatch> {
- let columns = batch
- .columns()
+fn hydrate_dictionaries(batch: &RecordBatch, schema: SchemaRef) ->
Result<RecordBatch> {
+ let columns = schema
+ .fields()
.iter()
- .map(|c| hydrate_dictionary(c, send_dictionaries))
+ .zip(batch.columns())
+ .map(|(field, c)| hydrate_dictionary(c, field.data_type()))
.collect::<Result<Vec<_>>>()?;
let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
@@ -534,22 +591,43 @@ fn prepare_batch_for_flight(
)?)
}
-/// Hydrates a dictionary to its underlying type if send_dictionaries is
false. If send_dictionaries
-/// is true, dictionaries are sent with every batch which is not as optimal as
described in [DictionaryHandling::Hydrate] above,
-/// but does enable sending DictionaryArray's via Flight.
-fn hydrate_dictionary(array: &ArrayRef, send_dictionaries: bool) ->
Result<ArrayRef> {
- let arr = match array.data_type() {
- DataType::Dictionary(_, value) if !send_dictionaries =>
arrow_cast::cast(array, value)?,
- _ => Arc::clone(array),
+/// Hydrates a dictionary to its underlying type.
+fn hydrate_dictionary(array: &ArrayRef, data_type: &DataType) ->
Result<ArrayRef> {
+ let arr = match (array.data_type(), data_type) {
+ (DataType::Union(_, UnionMode::Sparse), DataType::Union(fields,
UnionMode::Sparse)) => {
+ let union_arr =
array.as_any().downcast_ref::<UnionArray>().unwrap();
+
+ let (type_ids, fields): (Vec<i8>, Vec<&FieldRef>) =
fields.iter().unzip();
+
+ Arc::new(UnionArray::try_new(
+ &type_ids,
+ union_arr.type_ids().inner().clone(),
+ None,
+ fields
+ .iter()
+ .enumerate()
+ .map(|(col, field)| {
+ Ok((
+ field.as_ref().clone(),
+ arrow_cast::cast(union_arr.child(col as i8),
field.data_type())?,
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?,
+ )?)
+ }
+ (_, data_type) => arrow_cast::cast(array, data_type)?,
};
Ok(arr)
}
#[cfg(test)]
mod tests {
+ use arrow_array::builder::StringDictionaryBuilder;
use arrow_array::*;
use arrow_array::{cast::downcast_array, types::*};
+ use arrow_buffer::Buffer;
use arrow_cast::pretty::pretty_format_batches;
+ use arrow_schema::UnionMode;
use std::collections::HashMap;
use crate::decode::{DecodedPayload, FlightDataDecoder};
@@ -570,8 +648,8 @@ mod tests {
let (_, baseline_flight_batch) = make_flight_data(&batch, &options);
let big_batch = batch.slice(0, batch.num_rows() - 1);
- let optimized_big_batch = prepare_batch_for_flight(&big_batch,
Arc::clone(schema), false)
- .expect("failed to optimize");
+ let optimized_big_batch =
+ hydrate_dictionaries(&big_batch,
Arc::clone(schema)).expect("failed to optimize");
let (_, optimized_big_flight_batch) =
make_flight_data(&optimized_big_batch, &options);
assert_eq!(
@@ -581,8 +659,7 @@ mod tests {
let small_batch = batch.slice(0, 1);
let optimized_small_batch =
- prepare_batch_for_flight(&small_batch, Arc::clone(schema), false)
- .expect("failed to optimize");
+ hydrate_dictionaries(&small_batch,
Arc::clone(schema)).expect("failed to optimize");
let (_, optimized_small_flight_batch) =
make_flight_data(&optimized_small_batch, &options);
assert!(
@@ -592,19 +669,29 @@ mod tests {
#[tokio::test]
async fn test_dictionary_hydration() {
- let arr: DictionaryArray<UInt16Type> = vec!["a", "a",
"b"].into_iter().collect();
+ let arr1: DictionaryArray<UInt16Type> = vec!["a", "a",
"b"].into_iter().collect();
+ let arr2: DictionaryArray<UInt16Type> = vec!["c", "c",
"d"].into_iter().collect();
+
let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
"dict",
DataType::UInt16,
DataType::Utf8,
false,
)]));
- let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
- let encoder =
-
FlightDataEncoderBuilder::default().build(futures::stream::once(async {
Ok(batch) }));
+ let batch1 = RecordBatch::try_new(schema.clone(),
vec![Arc::new(arr1)]).unwrap();
+ let batch2 = RecordBatch::try_new(schema,
vec![Arc::new(arr2)]).unwrap();
+
+ let stream = futures::stream::iter(vec![Ok(batch1), Ok(batch2)]);
+
+ let encoder = FlightDataEncoderBuilder::default().build(stream);
let mut decoder = FlightDataDecoder::new(encoder);
let expected_schema = Schema::new(vec![Field::new("dict",
DataType::Utf8, false)]);
let expected_schema = Arc::new(expected_schema);
+ let mut expected_arrays = vec![
+ StringArray::from(vec!["a", "a", "b"]),
+ StringArray::from(vec!["c", "c", "d"]),
+ ]
+ .into_iter();
while let Some(decoded) = decoder.next().await {
let decoded = decoded.unwrap();
match decoded.payload {
@@ -612,7 +699,7 @@ mod tests {
DecodedPayload::Schema(s) => assert_eq!(s, expected_schema),
DecodedPayload::RecordBatch(b) => {
assert_eq!(b.schema(), expected_schema);
- let expected_array = StringArray::from(vec!["a", "a",
"b"]);
+ let expected_array = expected_arrays.next().unwrap();
let actual_array = b.column_by_name("dict").unwrap();
let actual_array =
downcast_array::<StringArray>(actual_array);
@@ -622,6 +709,311 @@ mod tests {
}
}
+ #[tokio::test]
+ async fn test_dictionary_list_hydration() {
+ let mut builder =
builder::ListBuilder::new(StringDictionaryBuilder::<UInt16Type>::new());
+
+ builder.append_value(vec![Some("a"), None, Some("b")]);
+
+ let arr1 = builder.finish();
+
+ builder.append_value(vec![Some("c"), None, Some("d")]);
+
+ let arr2 = builder.finish();
+
+ let schema = Arc::new(Schema::new(vec![Field::new_list(
+ "dict_list",
+ Field::new_dictionary("item", DataType::UInt16, DataType::Utf8,
true),
+ true,
+ )]));
+
+ let batch1 = RecordBatch::try_new(schema.clone(),
vec![Arc::new(arr1)]).unwrap();
+ let batch2 = RecordBatch::try_new(schema.clone(),
vec![Arc::new(arr2)]).unwrap();
+
+ let stream = futures::stream::iter(vec![Ok(batch1), Ok(batch2)]);
+
+ let encoder = FlightDataEncoderBuilder::default().build(stream);
+
+ let mut decoder = FlightDataDecoder::new(encoder);
+ let expected_schema = Schema::new(vec![Field::new_list(
+ "dict_list",
+ Field::new("item", DataType::Utf8, true),
+ true,
+ )]);
+
+ let expected_schema = Arc::new(expected_schema);
+
+ let mut expected_arrays = vec![
+ StringArray::from_iter(vec![Some("a"), None, Some("b")]),
+ StringArray::from_iter(vec![Some("c"), None, Some("d")]),
+ ]
+ .into_iter();
+
+ while let Some(decoded) = decoder.next().await {
+ let decoded = decoded.unwrap();
+ match decoded.payload {
+ DecodedPayload::None => {}
+ DecodedPayload::Schema(s) => assert_eq!(s, expected_schema),
+ DecodedPayload::RecordBatch(b) => {
+ assert_eq!(b.schema(), expected_schema);
+ let expected_array = expected_arrays.next().unwrap();
+ let list_array =
+
downcast_array::<ListArray>(b.column_by_name("dict_list").unwrap());
+ let elem_array =
downcast_array::<StringArray>(list_array.value(0).as_ref());
+
+ assert_eq!(elem_array, expected_array);
+ }
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_dictionary_struct_hydration() {
+ let struct_fields = vec![Field::new_list(
+ "dict_list",
+ Field::new_dictionary("item", DataType::UInt16, DataType::Utf8,
true),
+ true,
+ )];
+
+ let mut builder =
builder::ListBuilder::new(StringDictionaryBuilder::<UInt16Type>::new());
+
+ builder.append_value(vec![Some("a"), None, Some("b")]);
+
+ let arr1 = Arc::new(builder.finish());
+ let arr1 = StructArray::new(struct_fields.clone().into(), vec![arr1],
None);
+
+ builder.append_value(vec![Some("c"), None, Some("d")]);
+
+ let arr2 = Arc::new(builder.finish());
+ let arr2 = StructArray::new(struct_fields.clone().into(), vec![arr2],
None);
+
+ let schema = Arc::new(Schema::new(vec![Field::new_struct(
+ "struct",
+ struct_fields.clone(),
+ true,
+ )]));
+
+ let batch1 = RecordBatch::try_new(schema.clone(),
vec![Arc::new(arr1)]).unwrap();
+ let batch2 = RecordBatch::try_new(schema.clone(),
vec![Arc::new(arr2)]).unwrap();
+
+ let stream = futures::stream::iter(vec![Ok(batch1), Ok(batch2)]);
+
+ let encoder = FlightDataEncoderBuilder::default().build(stream);
+
+ let mut decoder = FlightDataDecoder::new(encoder);
+ let expected_schema = Schema::new(vec![Field::new_struct(
+ "struct",
+ vec![Field::new_list(
+ "dict_list",
+ Field::new("item", DataType::Utf8, true),
+ true,
+ )],
+ true,
+ )]);
+
+ let expected_schema = Arc::new(expected_schema);
+
+ let mut expected_arrays = vec![
+ StringArray::from_iter(vec![Some("a"), None, Some("b")]),
+ StringArray::from_iter(vec![Some("c"), None, Some("d")]),
+ ]
+ .into_iter();
+
+ while let Some(decoded) = decoder.next().await {
+ let decoded = decoded.unwrap();
+ match decoded.payload {
+ DecodedPayload::None => {}
+ DecodedPayload::Schema(s) => assert_eq!(s, expected_schema),
+ DecodedPayload::RecordBatch(b) => {
+ assert_eq!(b.schema(), expected_schema);
+ let expected_array = expected_arrays.next().unwrap();
+ let struct_array =
+
downcast_array::<StructArray>(b.column_by_name("struct").unwrap());
+ let list_array =
downcast_array::<ListArray>(struct_array.column(0));
+
+ let elem_array =
downcast_array::<StringArray>(list_array.value(0).as_ref());
+
+ assert_eq!(elem_array, expected_array);
+ }
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_dictionary_union_hydration() {
+ let struct_fields = vec![Field::new_list(
+ "dict_list",
+ Field::new_dictionary("item", DataType::UInt16, DataType::Utf8,
true),
+ true,
+ )];
+
+ let type_ids = vec![0, 1, 2];
+ let union_fields = vec![
+ Field::new_list(
+ "dict_list",
+ Field::new_dictionary("item", DataType::UInt16,
DataType::Utf8, true),
+ true,
+ ),
+ Field::new_struct("struct", struct_fields.clone(), true),
+ Field::new("string", DataType::Utf8, true),
+ ];
+
+ let struct_fields = vec![Field::new_list(
+ "dict_list",
+ Field::new_dictionary("item", DataType::UInt16, DataType::Utf8,
true),
+ true,
+ )];
+
+ let mut builder =
builder::ListBuilder::new(StringDictionaryBuilder::<UInt16Type>::new());
+
+ builder.append_value(vec![Some("a"), None, Some("b")]);
+
+ let arr1 = builder.finish();
+
+ let type_id_buffer = Buffer::from_slice_ref([0_i8]);
+ let arr1 = UnionArray::try_new(
+ &type_ids,
+ type_id_buffer,
+ None,
+ vec![
+ (union_fields[0].clone(), Arc::new(arr1)),
+ (
+ union_fields[1].clone(),
+ new_null_array(union_fields[1].data_type(), 1),
+ ),
+ (
+ union_fields[2].clone(),
+ new_null_array(union_fields[2].data_type(), 1),
+ ),
+ ],
+ )
+ .unwrap();
+
+ builder.append_value(vec![Some("c"), None, Some("d")]);
+
+ let arr2 = Arc::new(builder.finish());
+ let arr2 = StructArray::new(struct_fields.clone().into(), vec![arr2],
None);
+
+ let type_id_buffer = Buffer::from_slice_ref([1_i8]);
+ let arr2 = UnionArray::try_new(
+ &type_ids,
+ type_id_buffer,
+ None,
+ vec![
+ (
+ union_fields[0].clone(),
+ new_null_array(union_fields[0].data_type(), 1),
+ ),
+ (union_fields[1].clone(), Arc::new(arr2)),
+ (
+ union_fields[2].clone(),
+ new_null_array(union_fields[2].data_type(), 1),
+ ),
+ ],
+ )
+ .unwrap();
+
+ let type_id_buffer = Buffer::from_slice_ref([2_i8]);
+ let arr3 = UnionArray::try_new(
+ &type_ids,
+ type_id_buffer,
+ None,
+ vec![
+ (
+ union_fields[0].clone(),
+ new_null_array(union_fields[0].data_type(), 1),
+ ),
+ (
+ union_fields[1].clone(),
+ new_null_array(union_fields[1].data_type(), 1),
+ ),
+ (
+ union_fields[2].clone(),
+ Arc::new(StringArray::from(vec!["e"])),
+ ),
+ ],
+ )
+ .unwrap();
+
+ let schema = Arc::new(Schema::new(vec![Field::new_union(
+ "union",
+ type_ids.clone(),
+ union_fields.clone(),
+ UnionMode::Sparse,
+ )]));
+
+ let batch1 = RecordBatch::try_new(schema.clone(),
vec![Arc::new(arr1)]).unwrap();
+ let batch2 = RecordBatch::try_new(schema.clone(),
vec![Arc::new(arr2)]).unwrap();
+ let batch3 = RecordBatch::try_new(schema.clone(),
vec![Arc::new(arr3)]).unwrap();
+
+ let stream = futures::stream::iter(vec![Ok(batch1), Ok(batch2),
Ok(batch3)]);
+
+ let encoder = FlightDataEncoderBuilder::default().build(stream);
+
+ let mut decoder = FlightDataDecoder::new(encoder);
+
+ let hydrated_struct_fields = vec![Field::new_list(
+ "dict_list",
+ Field::new("item", DataType::Utf8, true),
+ true,
+ )];
+
+ let hydrated_union_fields = vec![
+ Field::new_list("dict_list", Field::new("item", DataType::Utf8,
true), true),
+ Field::new_struct("struct", hydrated_struct_fields.clone(), true),
+ Field::new("string", DataType::Utf8, true),
+ ];
+
+ let expected_schema = Schema::new(vec![Field::new_union(
+ "union",
+ type_ids.clone(),
+ hydrated_union_fields,
+ UnionMode::Sparse,
+ )]);
+
+ let expected_schema = Arc::new(expected_schema);
+
+ let mut expected_arrays = vec![
+ StringArray::from_iter(vec![Some("a"), None, Some("b")]),
+ StringArray::from_iter(vec![Some("c"), None, Some("d")]),
+ StringArray::from(vec!["e"]),
+ ]
+ .into_iter();
+
+ let mut batch = 0;
+ while let Some(decoded) = decoder.next().await {
+ let decoded = decoded.unwrap();
+ match decoded.payload {
+ DecodedPayload::None => {}
+ DecodedPayload::Schema(s) => assert_eq!(s, expected_schema),
+ DecodedPayload::RecordBatch(b) => {
+ assert_eq!(b.schema(), expected_schema);
+ let expected_array = expected_arrays.next().unwrap();
+ let union_arr =
+
downcast_array::<UnionArray>(b.column_by_name("union").unwrap());
+
+ let elem_array = match batch {
+ 0 => {
+ let list_array =
downcast_array::<ListArray>(union_arr.child(0));
+
downcast_array::<StringArray>(list_array.value(0).as_ref())
+ }
+ 1 => {
+ let struct_array =
downcast_array::<StructArray>(union_arr.child(1));
+ let list_array =
downcast_array::<ListArray>(struct_array.column(0));
+
+
downcast_array::<StringArray>(list_array.value(0).as_ref())
+ }
+ _ => downcast_array::<StringArray>(union_arr.child(2)),
+ };
+
+ batch += 1;
+
+ assert_eq!(elem_array, expected_array);
+ }
+ }
+ }
+ }
+
#[tokio::test]
async fn test_send_dictionaries() {
let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
@@ -683,7 +1075,7 @@ mod tests {
)
.expect("cannot create record batch");
- prepare_batch_for_flight(&batch, batch.schema(), false).expect("failed
to optimize");
+ hydrate_dictionaries(&batch, batch.schema()).expect("failed to
optimize");
}
pub fn make_flight_data(