This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch cherry_pick_0e7c4c5f in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
commit 8e51497e7f7a0638dbe7cca15846459f6634275b Author: Xavier Lange <[email protected]> AuthorDate: Sat Aug 28 07:17:39 2021 -0400 Parquet Derive: make chrono time emit converted type (#712) * NaiveDateTime emits converted type --- parquet_derive/src/parquet_field.rs | 79 ++++++++++++++++++++++++++++++------- parquet_derive_test/src/lib.rs | 6 ++- 2 files changed, 69 insertions(+), 16 deletions(-) diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 6f2fa0c..36730c7 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -207,14 +207,25 @@ impl Field { }; let logical_type = self.ty.logical_type(); let repetition = self.ty.repetition(); - quote! { - fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type) - .with_logical_type(#logical_type) - .with_repetition(#repetition) - .build() - .unwrap() - .into() - ); + let converted_type = self.ty.converted_type(); + + if let Some(converted_type) = converted_type { + quote! { + fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type) + .with_logical_type(#logical_type) + .with_repetition(#repetition) + .with_converted_type(#converted_type) + .build().unwrap().into() + ) + } + } else { + quote! { + fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type) + .with_logical_type(#logical_type) + .with_repetition(#repetition) + .build().unwrap().into() + ) + } } } @@ -534,6 +545,7 @@ impl Type { })) } } "NaiveDate" => quote! { Some(LogicalType::DATE(Default::default())) }, + "NaiveDateTime" => quote! { None }, "f32" | "f64" => quote! { None }, "String" | "str" => quote! { Some(LogicalType::STRING(Default::default())) }, "Uuid" => quote! { Some(LogicalType::UUID(Default::default())) }, @@ -541,6 +553,15 @@ impl Type { } } + fn converted_type(&self) -> Option<proc_macro2::TokenStream> { + let last_part = self.last_part(); + + match last_part.trim() { + "NaiveDateTime" => Some(quote! { ConvertedType::TIMESTAMP_MILLIS }), + _ => None, + } + } + fn repetition(&self) -> proc_macro2::TokenStream { match &self { Type::Option(_) => quote! { Repetition::OPTIONAL }, @@ -944,7 +965,6 @@ mod test { } #[test] - #[cfg(feature = "chrono")] fn test_chrono_timestamp_millis() { let snippet: proc_macro2::TokenStream = quote! { struct ATimestampStruct { @@ -971,7 +991,11 @@ mod test { { let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); let vals : Vec<_> = records.iter().filter_map(|rec| { - rec.maybe_happened.map(|inner| { inner.timestamp_millis() }) + if let Some(inner) = rec.maybe_happened { + Some(inner.timestamp_millis()) + } else { + None + } }).collect(); if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer { @@ -984,7 +1008,6 @@ mod test { } #[test] - #[cfg(feature = "chrono")] fn test_chrono_date() { let snippet: proc_macro2::TokenStream = quote! { struct ATimestampStruct { @@ -1011,7 +1034,11 @@ mod test { { let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); let vals : Vec<_> = records.iter().filter_map(|rec| { - rec.maybe_happened.map(|inner| { inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }) + if let Some(inner) = rec.maybe_happened { + Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) + } else { + None + } }).collect(); if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer { @@ -1024,10 +1051,9 @@ mod test { } #[test] - #[cfg(feature = "uuid")] fn test_uuid() { let snippet: proc_macro2::TokenStream = quote! { - struct ATimestampStruct { + struct AUuidStruct { unique_id: uuid::Uuid, maybe_unique_id: Option<&uuid::Uuid>, } @@ -1051,7 +1077,11 @@ mod test { { let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect(); let vals : Vec<_> = records.iter().filter_map(|rec| { - rec.maybe_unique_id.map(|ref inner| { (&inner.to_string()[..]).into() }) + if let Some(ref inner) = rec.maybe_unique_id { + Some((&inner.to_string()[..]).into()) + } else { + None + } }).collect(); if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer { @@ -1062,4 +1092,23 @@ mod test { } }).to_string()); } + + #[test] + fn test_converted_type() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimeStruct { + time: chrono::NaiveDateTime, + } + }; + + let fields = extract_fields(snippet); + + let time = Field::from(&fields[0]); + + let converted_type = time.ty.converted_type(); + assert_eq!( + converted_type.unwrap().to_string(), + quote! { ConvertedType::TIMESTAMP_MILLIS }.to_string() + ); + } } diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index bc8e914..2b7c060 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -46,6 +46,7 @@ struct ACompleteRecord<'a> { pub maybe_double: Option<f64>, pub borrowed_maybe_a_string: &'a Option<String>, pub borrowed_maybe_a_str: &'a Option<&'a str>, + pub now: chrono::NaiveDateTime, } #[cfg(test)] @@ -88,8 +89,11 @@ mod tests { OPTIONAL DOUBLE maybe_double; OPTIONAL BINARY borrowed_maybe_a_string (STRING); OPTIONAL BINARY borrowed_maybe_a_str (STRING); + REQUIRED INT64 now (TIMESTAMP_MILLIS); }"; + let schema = Arc::new(parse_message_type(schema_str).unwrap()); + let a_str = "hello mother".to_owned(); let a_borrowed_string = "cool news".to_owned(); let maybe_a_string = Some("it's true, I'm a string".to_owned()); @@ -116,9 +120,9 @@ mod tests { maybe_double: Some(std::f64::MAX), borrowed_maybe_a_string: &maybe_a_string, borrowed_maybe_a_str: &maybe_a_str, + now: chrono::Utc::now().naive_local(), }]; - let schema = Arc::new(parse_message_type(schema_str).unwrap()); let generated_schema = drs.as_slice().schema().unwrap(); assert_eq!(&schema, &generated_schema);
