This is an automated email from the ASF dual-hosted git repository. nevime pushed a commit to branch rust-parquet-arrow-writer in repository https://gitbox.apache.org/repos/asf/arrow.git
commit d748efa7463f7e2911b8191166befcc19929a0f7 Author: Carol (Nichols || Goulding) <carol.nich...@gmail.com> AuthorDate: Sat Oct 3 02:34:38 2020 +0200 ARROW-8426: [Rust] [Parquet] Add support for writing dictionary types In this commit, I: - Extracted a `build_field` function for some code shared between `schema_to_fb` and `schema_to_fb_offset` that needed to change - Uncommented the dictionary field from the Arrow schema roundtrip test and add a dictionary field to the IPC roundtrip test - If a field is a dictionary field, call `add_dictionary` with the dictionary field information on the flatbuffer field, building the dictionary as [the C++ code does][cpp-dictionary] and describe with the same comment - When getting the field type for a dictionary field, use the `value_type` as [the C++ code does][cpp-value-type] and describe with the same comment The tests pass because the Parquet -> Arrow conversion for dictionaries is [already supported][parquet-to-arrow]. [cpp-dictionary]: https://github.com/apache/arrow/blob/477c1021ac013f22389baf9154fb9ad0cf814bec/cpp/src/arrow/ipc/metadata_internal.cc#L426-L440 [cpp-value-type]: https://github.com/apache/arrow/blob/477c1021ac013f22389baf9154fb9ad0cf814bec/cpp/src/arrow/ipc/metadata_internal.cc#L662-L667 [parquet-to-arrow]: https://github.com/apache/arrow/blob/477c1021ac013f22389baf9154fb9ad0cf814bec/rust/arrow/src/ipc/convert.rs#L120-L127 Closes #8291 from carols10cents/rust-parquet-arrow-writer Authored-by: Carol (Nichols || Goulding) <carol.nich...@gmail.com> Signed-off-by: Neville Dipale <nevilled...@gmail.com> --- rust/arrow/src/datatypes.rs | 4 +- rust/arrow/src/ipc/convert.rs | 105 ++++++++++++++++++++++++++++++--------- rust/parquet/src/arrow/schema.rs | 20 ++++---- 3 files changed, 93 insertions(+), 36 deletions(-) diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 0d05f82..c647af6 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -189,8 +189,8 @@ pub struct Field { name: String, data_type: DataType, nullable: bool, - dict_id: i64, - dict_is_ordered: bool, + pub(crate) dict_id: i64, + pub(crate) dict_is_ordered: bool, } pub trait ArrowNativeType: diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 7a5795d..8f429bf 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -34,18 +34,8 @@ pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { let mut fields = vec![]; for field in schema.fields() { - let fb_field_name = fbb.create_string(field.name().as_str()); - let field_type = get_fb_field_type(field.data_type(), &mut fbb); - let mut field_builder = ipc::FieldBuilder::new(&mut fbb); - field_builder.add_name(fb_field_name); - field_builder.add_type_type(field_type.type_type); - field_builder.add_nullable(field.is_nullable()); - match field_type.children { - None => {} - Some(children) => field_builder.add_children(children), - }; - field_builder.add_type_(field_type.type_); - fields.push(field_builder.finish()); + let fb_field = build_field(&mut fbb, field); + fields.push(fb_field); } let mut custom_metadata = vec![]; @@ -80,18 +70,8 @@ pub fn schema_to_fb_offset<'a: 'b, 'b>( ) -> WIPOffset<ipc::Schema<'b>> { let mut fields = vec![]; for field in schema.fields() { - let fb_field_name = fbb.create_string(field.name().as_str()); - let field_type = get_fb_field_type(field.data_type(), fbb); - let mut field_builder = ipc::FieldBuilder::new(fbb); - field_builder.add_name(fb_field_name); - field_builder.add_type_type(field_type.type_type); - field_builder.add_nullable(field.is_nullable()); - match field_type.children { - None => {} - Some(children) => field_builder.add_children(children), - }; - field_builder.add_type_(field_type.type_); - fields.push(field_builder.finish()); + let fb_field = build_field(fbb, field); + fields.push(fb_field); } let mut custom_metadata = vec![]; @@ -333,6 +313,38 @@ pub(crate) struct FBFieldType<'b> { pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<ipc::Field<'b>>>>>, } +/// Create an IPC Field from an Arrow Field +pub(crate) fn build_field<'a: 'b, 'b>( + fbb: &mut FlatBufferBuilder<'a>, + field: &Field, +) -> WIPOffset<ipc::Field<'b>> { + let fb_field_name = fbb.create_string(field.name().as_str()); + let field_type = get_fb_field_type(field.data_type(), fbb); + + let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() { + Some(get_fb_dictionary( + index_type, + field.dict_id, + field.dict_is_ordered, + fbb, + )) + } else { + None + }; + + let mut field_builder = ipc::FieldBuilder::new(fbb); + field_builder.add_name(fb_field_name); + fb_dictionary.map(|dictionary| field_builder.add_dictionary(dictionary)); + field_builder.add_type_type(field_type.type_type); + field_builder.add_nullable(field.is_nullable()); + match field_type.children { + None => {} + Some(children) => field_builder.add_children(children), + }; + field_builder.add_type_(field_type.type_); + field_builder.finish() +} + /// Get the IPC type of a data type pub(crate) fn get_fb_field_type<'a: 'b, 'b>( data_type: &DataType, @@ -609,10 +621,45 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( children: Some(fbb.create_vector(&children[..])), } } + Dictionary(_, value_type) => { + // In this library, the dictionary "type" is a logical construct. Here we + // pass through to the value type, as we've already captured the index + // type in the DictionaryEncoding metadata in the parent field + get_fb_field_type(value_type, fbb) + } t => unimplemented!("Type {:?} not supported", t), } } +/// Create an IPC dictionary encoding +pub(crate) fn get_fb_dictionary<'a: 'b, 'b>( + index_type: &DataType, + dict_id: i64, + dict_is_ordered: bool, + fbb: &mut FlatBufferBuilder<'a>, +) -> WIPOffset<ipc::DictionaryEncoding<'b>> { + // We assume that the dictionary index type (as an integer) has already been + // validated elsewhere, and can safely assume we are dealing with signed + // integers + let mut index_builder = ipc::IntBuilder::new(fbb); + index_builder.add_is_signed(true); + match *index_type { + Int8 => index_builder.add_bitWidth(8), + Int16 => index_builder.add_bitWidth(16), + Int32 => index_builder.add_bitWidth(32), + Int64 => index_builder.add_bitWidth(64), + _ => {} + } + let index_builder = index_builder.finish(); + + let mut builder = ipc::DictionaryEncodingBuilder::new(fbb); + builder.add_id(dict_id); + builder.add_indexType(index_builder); + builder.add_isOrdered(dict_is_ordered); + + builder.finish() +} + #[cfg(test)] mod tests { use super::*; @@ -714,6 +761,16 @@ mod tests { false, ), Field::new("struct<>", DataType::Struct(vec![]), true), + Field::new_dict( + "dictionary<int32, utf8>", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + true, + 123, + true, + ), ], md, ); diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index d5a0ff9..4a92a46 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -1396,16 +1396,16 @@ mod tests { // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false), // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false), // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), - // Field::new_dict( - // "c31", - // DataType::Dictionary( - // Box::new(DataType::Int32), - // Box::new(DataType::Utf8), - // ), - // true, - // 123, - // true, - // ), + Field::new_dict( + "c31", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + true, + 123, + true, + ), Field::new("c32", DataType::LargeBinary, true), Field::new("c33", DataType::LargeUtf8, true), Field::new(