This is an automated email from the ASF dual-hosted git repository. etseidl pushed a commit to branch gh5854_thrift_remodel in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this push: new c327d7f44a [thrift-remodel] Rework thrift reader API (#8341) c327d7f44a is described below commit c327d7f44ab53ca879ee30783abcdd1da5d072bd Author: Ed Seidl <etse...@users.noreply.github.com> AuthorDate: Wed Sep 17 12:12:31 2025 -0700 [thrift-remodel] Rework thrift reader API (#8341) # Which issue does this PR close? **Note: this targets a feature branch, not main** - Part of #5854. # Rationale for this change As I started on decoding thrift page headers, I found that the way I had been going was no longer going to work. This PR begins the process of abstracting the thrift reader to allow for other implementations. # What changes are included in this PR? In addition to reworking the reader itself, this PR moves away from the previous `TryFrom` approach and instead adds a `ReadThrift` trait. # Are these changes tested? Should be covered by existing tests # Are there any user-facing changes? Yes --- parquet/src/basic.rs | 52 ++-- parquet/src/file/column_crypto_metadata.rs | 4 +- parquet/src/file/metadata/mod.rs | 4 +- parquet/src/file/metadata/reader.rs | 6 +- parquet/src/file/metadata/thrift_gen.rs | 24 +- parquet/src/file/page_encoding_stats.rs | 6 +- parquet/src/file/page_index/index_reader.rs | 14 +- parquet/src/file/page_index/offset_index.rs | 12 +- parquet/src/parquet_macros.rs | 91 +++--- parquet/src/parquet_thrift.rs | 414 ++++++++++++++-------------- 10 files changed, 311 insertions(+), 316 deletions(-) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 5fffb56cdf..44fe66aff7 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -26,8 +26,8 @@ use std::{fmt, str}; pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel}; use crate::parquet_thrift::{ - ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift, - WriteThriftField, + ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, + WriteThrift, WriteThriftField, }; use crate::{thrift_enum, thrift_struct, thrift_union_all_empty}; @@ -165,9 +165,8 @@ pub enum ConvertedType { INTERVAL, } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ConvertedType { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ConvertedType { + fn read_thrift(prot: &mut R) -> Result<Self> { let val = prot.read_i32()?; Ok(match val { 0 => Self::UTF8, @@ -361,12 +360,9 @@ pub enum LogicalType { }, } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { - prot.read_struct_begin()?; - - let field_ident = prot.read_field_begin()?; +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for LogicalType { + fn read_thrift(prot: &mut R) -> Result<Self> { + let field_ident = prot.read_field_begin(0)?; if field_ident.field_type == FieldType::Stop { return Err(general_err!("received empty union from remote LogicalType")); } @@ -388,7 +384,7 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType { Self::Enum } 5 => { - let val = DecimalType::try_from(&mut *prot)?; + let val = DecimalType::read_thrift(&mut *prot)?; Self::Decimal { scale: val.scale, precision: val.precision, @@ -399,21 +395,21 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType { Self::Date } 7 => { - let val = TimeType::try_from(&mut *prot)?; + let val = TimeType::read_thrift(&mut *prot)?; Self::Time { is_adjusted_to_u_t_c: val.is_adjusted_to_u_t_c, unit: val.unit, } } 8 => { - let val = TimestampType::try_from(&mut *prot)?; + let val = TimestampType::read_thrift(&mut *prot)?; Self::Timestamp { is_adjusted_to_u_t_c: val.is_adjusted_to_u_t_c, unit: val.unit, } } 10 => { - let val = IntType::try_from(&mut *prot)?; + let val = IntType::read_thrift(&mut *prot)?; Self::Integer { is_signed: val.is_signed, bit_width: val.bit_width, @@ -440,19 +436,19 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType { Self::Float16 } 16 => { - let val = VariantType::try_from(&mut *prot)?; + let val = VariantType::read_thrift(&mut *prot)?; Self::Variant { specification_version: val.specification_version, } } 17 => { - let val = GeometryType::try_from(&mut *prot)?; + let val = GeometryType::read_thrift(&mut *prot)?; Self::Geometry { crs: val.crs.map(|s| s.to_owned()), } } 18 => { - let val = GeographyType::try_from(&mut *prot)?; + let val = GeographyType::read_thrift(&mut *prot)?; Self::Geography { crs: val.crs.map(|s| s.to_owned()), algorithm: val.algorithm, @@ -465,13 +461,12 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType { } } }; - let field_ident = prot.read_field_begin()?; + let field_ident = prot.read_field_begin(field_ident.id)?; if field_ident.field_type != FieldType::Stop { return Err(general_err!( "Received multiple fields for union from remote LogicalType" )); } - prot.read_struct_end()?; Ok(ret) } } @@ -756,9 +751,8 @@ pub enum Compression { LZ4_RAW, } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for Compression { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for Compression { + fn read_thrift(prot: &mut R) -> Result<Self> { let val = prot.read_i32()?; Ok(match val { 0 => Self::UNCOMPRESSED, @@ -1124,12 +1118,9 @@ impl ColumnOrder { } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnOrder { - type Error = ParquetError; - - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { - prot.read_struct_begin()?; - let field_ident = prot.read_field_begin()?; +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ColumnOrder { + fn read_thrift(prot: &mut R) -> Result<Self> { + let field_ident = prot.read_field_begin(0)?; if field_ident.field_type == FieldType::Stop { return Err(general_err!("Received empty union from remote ColumnOrder")); } @@ -1144,13 +1135,12 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnOrder { Self::UNKNOWN } }; - let field_ident = prot.read_field_begin()?; + let field_ident = prot.read_field_begin(field_ident.id)?; if field_ident.field_type != FieldType::Stop { return Err(general_err!( "Received multiple fields for union from remote ColumnOrder" )); } - prot.read_struct_end()?; Ok(ret) } } diff --git a/parquet/src/file/column_crypto_metadata.rs b/parquet/src/file/column_crypto_metadata.rs index 5bba073579..6a538bd42b 100644 --- a/parquet/src/file/column_crypto_metadata.rs +++ b/parquet/src/file/column_crypto_metadata.rs @@ -26,8 +26,8 @@ use crate::format::{ EncryptionWithFooterKey as TEncryptionWithFooterKey, }; use crate::parquet_thrift::{ - ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift, - WriteThriftField, + read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, + ThriftCompactOutputProtocol, WriteThrift, WriteThriftField, }; use crate::{thrift_struct, thrift_union}; diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index d23d46a33b..95e9a48b46 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -121,8 +121,8 @@ use crate::{ use crate::{ basic::{ColumnOrder, Compression, Encoding, Type}, parquet_thrift::{ - ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, - WriteThrift, WriteThriftField, + ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, + ThriftCompactOutputProtocol, WriteThrift, WriteThriftField, }, }; use crate::{ diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 46022b459d..73c6a8ee40 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -19,7 +19,7 @@ use std::{io::Read, ops::Range}; #[cfg(feature = "encryption")] use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties}; -use crate::parquet_thrift::ThriftCompactInputProtocol; +use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; use bytes::Bytes; use crate::errors::{ParquetError, Result}; @@ -962,8 +962,8 @@ impl ParquetMetaDataReader { /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> { - let mut prot = ThriftCompactInputProtocol::new(buf); - ParquetMetaData::try_from(&mut prot) + let mut prot = ThriftSliceInputProtocol::new(buf); + ParquetMetaData::read_thrift(&mut prot) } } diff --git a/parquet/src/file/metadata/thrift_gen.rs b/parquet/src/file/metadata/thrift_gen.rs index 06229fb181..b656bacc8c 100644 --- a/parquet/src/file/metadata/thrift_gen.rs +++ b/parquet/src/file/metadata/thrift_gen.rs @@ -35,8 +35,8 @@ use crate::{ statistics::ValueStatistics, }, parquet_thrift::{ - ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, - WriteThrift, WriteThriftField, + read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, + ThriftCompactOutputProtocol, WriteThrift, WriteThriftField, }, schema::types::{parquet_schema_from_array, ColumnDescriptor, SchemaDescriptor}, thrift_struct, thrift_union, @@ -46,6 +46,7 @@ use crate::{ use crate::{ encryption::decrypt::{FileDecryptionProperties, FileDecryptor}, file::column_crypto_metadata::ColumnCryptoMetaData, + parquet_thrift::ThriftSliceInputProtocol, schema::types::SchemaDescPtr, }; @@ -669,8 +670,8 @@ fn row_group_from_encrypted_thrift( ) })?; - let mut prot = ThriftCompactInputProtocol::new(decrypted_cc_buf.as_slice()); - let col_meta = ColumnMetaData::try_from(&mut prot)?; + let mut prot = ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice()); + let col_meta = ColumnMetaData::read_thrift(&mut prot)?; c.meta_data = Some(col_meta); columns.push(convert_column(c, d.clone())?); } else { @@ -699,14 +700,14 @@ pub(crate) fn parquet_metadata_with_encryption( encrypted_footer: bool, buf: &[u8], ) -> Result<ParquetMetaData> { - let mut prot = ThriftCompactInputProtocol::new(buf); + let mut prot = ThriftSliceInputProtocol::new(buf); let mut file_decryptor = None; let decrypted_fmd_buf; if encrypted_footer { if let Some(file_decryption_properties) = file_decryption_properties { let t_file_crypto_metadata: FileCryptoMetaData = - FileCryptoMetaData::try_from(&mut prot) + FileCryptoMetaData::read_thrift(&mut prot) .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm { EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix, @@ -734,7 +735,7 @@ pub(crate) fn parquet_metadata_with_encryption( "Provided footer key and AAD were unable to decrypt parquet footer" ) })?; - prot = ThriftCompactInputProtocol::new(decrypted_fmd_buf.as_ref()); + prot = ThriftSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); file_decryptor = Some(decryptor); } else { @@ -744,7 +745,7 @@ pub(crate) fn parquet_metadata_with_encryption( } } - let file_meta = super::thrift_gen::FileMetaData::try_from(&mut prot) + let file_meta = super::thrift_gen::FileMetaData::read_thrift(&mut prot) .map_err(|e| general_err!("Could not parse metadata: {}", e))?; let version = file_meta.version; @@ -852,10 +853,9 @@ pub(super) fn get_file_decryptor( /// Create ParquetMetaData from thrift input. Note that this only decodes the file metadata in /// the Parquet footer. Page indexes will need to be added later. -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ParquetMetaData { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { - let file_meta = super::thrift_gen::FileMetaData::try_from(prot)?; +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ParquetMetaData { + fn read_thrift(prot: &mut R) -> Result<Self> { + let file_meta = super::thrift_gen::FileMetaData::read_thrift(prot)?; let version = file_meta.version; let num_rows = file_meta.num_rows; diff --git a/parquet/src/file/page_encoding_stats.rs b/parquet/src/file/page_encoding_stats.rs index 2d433dc9b3..934e177de0 100644 --- a/parquet/src/file/page_encoding_stats.rs +++ b/parquet/src/file/page_encoding_stats.rs @@ -20,10 +20,10 @@ use std::io::Write; use crate::basic::{Encoding, PageType}; -use crate::errors::{ParquetError, Result}; +use crate::errors::Result; use crate::parquet_thrift::{ - ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift, - WriteThriftField, + ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, + WriteThrift, WriteThriftField, }; use crate::thrift_struct; diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index e9cf119224..3db597954e 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -27,8 +27,8 @@ use crate::file::page_index::column_index::{ use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::ChunkReader; use crate::parquet_thrift::{ - ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift, - WriteThriftField, + read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, + ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField, }; use crate::thrift_struct; use std::io::Write; @@ -136,15 +136,15 @@ pub fn read_offset_indexes<R: ChunkReader>( } pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> { - let mut prot = ThriftCompactInputProtocol::new(data); + let mut prot = ThriftSliceInputProtocol::new(data); // Try to read fast-path first. If that fails, fall back to slower but more robust // decoder. match OffsetIndexMetaData::try_from_fast(&mut prot) { Ok(offset_index) => Ok(offset_index), Err(_) => { - prot = ThriftCompactInputProtocol::new(data); - OffsetIndexMetaData::try_from(&mut prot) + prot = ThriftSliceInputProtocol::new(data); + OffsetIndexMetaData::read_thrift(&mut prot) } } } @@ -166,8 +166,8 @@ pub(crate) fn decode_column_index( data: &[u8], column_type: Type, ) -> Result<ColumnIndexMetaData, ParquetError> { - let mut prot = ThriftCompactInputProtocol::new(data); - let index = ThriftColumnIndex::try_from(&mut prot)?; + let mut prot = ThriftSliceInputProtocol::new(data); + let index = ThriftColumnIndex::read_thrift(&mut prot)?; let index = match column_type { Type::BOOLEAN => { diff --git a/parquet/src/file/page_index/offset_index.rs b/parquet/src/file/page_index/offset_index.rs index ac2620af09..2153b8ed30 100644 --- a/parquet/src/file/page_index/offset_index.rs +++ b/parquet/src/file/page_index/offset_index.rs @@ -22,8 +22,8 @@ use std::io::Write; use crate::parquet_thrift::{ - ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift, - WriteThriftField, + read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, + ThriftCompactOutputProtocol, WriteThrift, WriteThriftField, }; use crate::{ errors::{ParquetError, Result}, @@ -113,7 +113,9 @@ impl OffsetIndexMetaData { // Fast-path read of offset index. This works because we expect all field deltas to be 1, // and there's no nesting beyond PageLocation, so no need to save the last field id. Like // read_page_locations(), this will fail if absolute field id's are used. - pub(super) fn try_from_fast<'a>(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { + pub(super) fn try_from_fast<'a, R: ThriftCompactInputProtocol<'a>>( + prot: &mut R, + ) -> Result<Self> { // Offset index is a struct with 2 fields. First field is an array of PageLocations, // the second an optional array of i64. @@ -140,7 +142,7 @@ impl OffsetIndexMetaData { "encountered unknown field while reading OffsetIndex" )); } - let vec = Vec::<i64>::try_from(&mut *prot)?; + let vec = read_thrift_vec::<i64, R>(&mut *prot)?; unencoded_byte_array_data_bytes = Some(vec); // this one should be Stop @@ -164,7 +166,7 @@ impl OffsetIndexMetaData { // Note: this will fail if the fields are either out of order, or if a suboptimal // encoder doesn't use field deltas. -fn read_page_location<'a>(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<PageLocation> { +fn read_page_location<'a, R: ThriftCompactInputProtocol<'a>>(prot: &mut R) -> Result<PageLocation> { // there are 3 fields, all mandatory, so all field deltas should be 1 let (field_type, delta) = prot.read_field_header()?; if delta != 1 || field_type != FieldType::I64 as u8 { diff --git a/parquet/src/parquet_macros.rs b/parquet/src/parquet_macros.rs index 939f3cb339..889e5fafef 100644 --- a/parquet/src/parquet_macros.rs +++ b/parquet/src/parquet_macros.rs @@ -37,10 +37,9 @@ macro_rules! thrift_enum { $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name = $field_value,)* } - impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier { - type Error = ParquetError; + impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for $identifier { #[allow(deprecated)] - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { + fn read_thrift(prot: &mut R) -> Result<Self> { let val = prot.read_i32()?; match val { $($field_value => Ok(Self::$field_name),)* @@ -109,12 +108,9 @@ macro_rules! thrift_union_all_empty { $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name),* } - impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier { - type Error = ParquetError; - - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { - prot.read_struct_begin()?; - let field_ident = prot.read_field_begin()?; + impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for $identifier { + fn read_thrift(prot: &mut R) -> Result<Self> { + let field_ident = prot.read_field_begin(0)?; if field_ident.field_type == FieldType::Stop { return Err(general_err!("Received empty union from remote {}", stringify!($identifier))); } @@ -128,13 +124,12 @@ macro_rules! thrift_union_all_empty { return Err(general_err!("Unexpected {} {}", stringify!($identifier), field_ident.id)); } }; - let field_ident = prot.read_field_begin()?; + let field_ident = prot.read_field_begin(field_ident.id)?; if field_ident.field_type != FieldType::Stop { return Err(general_err!( "Received multiple fields for union from remote {}", stringify!($identifier) )); } - prot.read_struct_end()?; Ok(ret) } } @@ -195,12 +190,9 @@ macro_rules! thrift_union { $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name $( ( $crate::__thrift_union_type!{$field_type $($field_lt)? $($element_type)?} ) )?),* } - impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier $(<$lt>)? { - type Error = ParquetError; - - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { - prot.read_struct_begin()?; - let field_ident = prot.read_field_begin()?; + impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for $identifier $(<$lt>)? { + fn read_thrift(prot: &mut R) -> Result<Self> { + let field_ident = prot.read_field_begin(0)?; if field_ident.field_type == FieldType::Stop { return Err(general_err!("Received empty union from remote {}", stringify!($identifier))); } @@ -213,13 +205,12 @@ macro_rules! thrift_union { return Err(general_err!("Unexpected {} {}", stringify!($identifier), field_ident.id)); } }; - let field_ident = prot.read_field_begin()?; + let field_ident = prot.read_field_begin(field_ident.id)?; if field_ident.field_type != FieldType::Stop { return Err(general_err!( concat!("Received multiple fields for union from remote {}", stringify!($identifier)) )); } - prot.read_struct_end()?; Ok(ret) } } @@ -283,27 +274,26 @@ macro_rules! thrift_struct { $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $vis $field_name: $crate::__thrift_required_or_optional!($required_or_optional $crate::__thrift_field_type!($field_type $($field_lt)? $($element_type)?))),* } - impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier $(<$lt>)? { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { + impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for $identifier $(<$lt>)? { + fn read_thrift(prot: &mut R) -> Result<Self> { $(let mut $field_name: Option<$crate::__thrift_field_type!($field_type $($field_lt)? $($element_type)?)> = None;)* - prot.read_struct_begin()?; + let mut last_field_id = 0i16; loop { - let field_ident = prot.read_field_begin()?; + let field_ident = prot.read_field_begin(last_field_id)?; if field_ident.field_type == FieldType::Stop { break; } match field_ident.id { $($field_id => { - let val = $crate::__thrift_read_field!(prot, $field_type $($field_lt)? $($element_type)?); + let val = $crate::__thrift_read_field!(prot, field_ident, $field_type $($field_lt)? $($element_type)?); $field_name = Some(val); })* _ => { prot.skip(field_ident.field_type)?; } }; + last_field_id = field_ident.id; } - prot.read_struct_end()?; $($crate::__thrift_result_required_or_optional!($required_or_optional $field_name);)* Ok(Self { $($field_name),* @@ -417,39 +407,42 @@ macro_rules! __thrift_result_required_or_optional { #[doc(hidden)] #[macro_export] macro_rules! __thrift_read_field { - ($prot:tt, list $lt:lifetime binary) => { - Vec::<&'a [u8]>::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, list $lt:lifetime binary) => { + read_thrift_vec::<&'a [u8], R>(&mut *$prot)? }; - ($prot:tt, list $lt:lifetime $element_type:ident) => { - Vec::<$element_type>::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, list $lt:lifetime $element_type:ident) => { + read_thrift_vec::<$element_type, R>(&mut *$prot)? }; - ($prot:tt, list string) => { - Vec::<String>::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, list string) => { + read_thrift_vec::<String, R>(&mut *$prot)? }; - ($prot:tt, list $element_type:ident) => { - Vec::<$element_type>::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, list $element_type:ident) => { + read_thrift_vec::<$element_type, R>(&mut *$prot)? }; - ($prot:tt, string $lt:lifetime) => { - <&$lt str>::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, string $lt:lifetime) => { + <&$lt str>::read_thrift(&mut *$prot)? }; - ($prot:tt, binary $lt:lifetime) => { - <&$lt [u8]>::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, binary $lt:lifetime) => { + <&$lt [u8]>::read_thrift(&mut *$prot)? }; - ($prot:tt, $field_type:ident $lt:lifetime) => { - $field_type::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, $field_type:ident $lt:lifetime) => { + $field_type::read_thrift(&mut *$prot)? }; - ($prot:tt, string) => { - String::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, string) => { + String::read_thrift(&mut *$prot)? }; - ($prot:tt, binary) => { + ($prot:tt, $field_ident:tt, binary) => { // this one needs to not conflict with `list<i8>` $prot.read_bytes()?.to_vec() }; - ($prot:tt, double) => { - $crate::parquet_thrift::OrderedF64::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, double) => { + $crate::parquet_thrift::OrderedF64::read_thrift(&mut *$prot)? + }; + ($prot:tt, $field_ident:tt, bool) => { + $field_ident.bool_val.unwrap() }; - ($prot:tt, $field_type:ident) => { - $field_type::try_from(&mut *$prot)? + ($prot:tt, $field_ident:tt, $field_type:ident) => { + $field_type::read_thrift(&mut *$prot)? }; } @@ -482,10 +475,10 @@ macro_rules! __thrift_union_type { #[macro_export] macro_rules! __thrift_read_variant { ($prot:tt, $field_name:ident $field_type:ident) => { - Self::$field_name($field_type::try_from(&mut *$prot)?) + Self::$field_name($field_type::read_thrift(&mut *$prot)?) }; ($prot:tt, $field_name:ident list $field_type:ident) => { - Self::$field_name(Vec::<$field_type>::try_from(&mut *$prot)?) + Self::$field_name(Vec::<$field_type>::read_thrift(&mut *$prot)?) }; ($prot:tt, $field_name:ident) => {{ $prot.skip_empty_struct()?; diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs index 9b83c0a01b..17847d0b71 100644 --- a/parquet/src/parquet_thrift.rs +++ b/parquet/src/parquet_thrift.rs @@ -24,10 +24,9 @@ use std::{cmp::Ordering, io::Write}; use crate::errors::{ParquetError, Result}; -// Couldn't implement thrift structs with f64 do to lack of Eq -// for f64. This is a hacky workaround for now...there are other -// wrappers out there that should probably be used instead. -// thrift seems to re-export an impl from ordered-float +/// Wrapper for thrift `double` fields. This is used to provide +/// an implementation of `Eq` for floats. This implementation +/// uses IEEE 754 total order. #[derive(Debug, Clone, Copy, PartialEq)] pub struct OrderedF64(f64); @@ -156,53 +155,52 @@ impl TryFrom<u8> for ElementType { } } +/// Struct used to describe a [thrift struct] field during decoding. +/// +/// [thrift struct]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#struct-encoding pub(crate) struct FieldIdentifier { + /// The type for the field. pub(crate) field_type: FieldType, + /// The field's `id`. May be computed from delta or directly decoded. pub(crate) id: i16, + /// Stores the value for booleans. + /// + /// Boolean fields store no data, instead the field type is either boolean true, or + /// boolean false. + pub(crate) bool_val: Option<bool>, } +/// Struct used to describe a [thrift list]. +/// +/// [thrift list]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#list-and-set #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct ListIdentifier { + /// The type for each element in the list. pub(crate) element_type: ElementType, + /// Number of elements contained in the list. pub(crate) size: i32, } -/// A more performant implementation of [`TCompactInputProtocol`] that reads a slice +/// Low-level object used to deserialize structs encoded with the Thrift [compact] protocol. /// -/// [`TCompactInputProtocol`]: thrift::protocol::TCompactInputProtocol -pub(crate) struct ThriftCompactInputProtocol<'a> { - buf: &'a [u8], - // Identifier of the last field deserialized for a struct. - last_read_field_id: i16, - // Stack of the last read field ids (a new entry is added each time a nested struct is read). - read_field_id_stack: Vec<i16>, - // Boolean value for a field. - // Saved because boolean fields and their value are encoded in a single byte, - // and reading the field only occurs after the field id is read. - pending_read_bool_value: Option<bool>, -} - -impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { - pub fn new(buf: &'a [u8]) -> Self { - Self { - buf, - last_read_field_id: 0, - read_field_id_stack: Vec::with_capacity(16), - pending_read_bool_value: None, - } - } +/// Implementation of this trait must provide the low-level functions `read_byte`, `read_bytes`, +/// `skip_bytes`, and `read_double`. These primitives are used by the default functions provided +/// here to perform deserialization. +/// +/// [compact]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md +pub(crate) trait ThriftCompactInputProtocol<'a> { + /// Read a single byte from the input. + fn read_byte(&mut self) -> Result<u8>; - pub fn reset_buffer(&mut self, buf: &'a [u8]) { - self.buf = buf; - self.last_read_field_id = 0; - self.read_field_id_stack.clear(); - self.pending_read_bool_value = None; - } + /// Read a Thrift encoded [binary] from the input. + /// + /// [binary]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#binary-encoding + fn read_bytes(&mut self) -> Result<&'a [u8]>; - pub fn as_slice(&self) -> &'a [u8] { - self.buf - } + /// Skip the next `n` bytes of input. + fn skip_bytes(&mut self, n: usize) -> Result<()>; + /// Read a ULEB128 encoded unsigned varint from the input. fn read_vlq(&mut self) -> Result<u64> { let mut in_progress = 0; let mut shift = 0; @@ -216,12 +214,14 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { } } + /// Read a zig-zag encoded signed varint from the input. fn read_zig_zag(&mut self) -> Result<i64> { let val = self.read_vlq()?; Ok((val >> 1) as i64 ^ -((val & 1) as i64)) } - fn read_list_set_begin(&mut self) -> Result<(ElementType, i32)> { + /// Read the [`ListIdentifier`] for a Thrift encoded list. + fn read_list_begin(&mut self) -> Result<ListIdentifier> { let header = self.read_byte()?; let element_type = ElementType::try_from(header & 0x0f)?; @@ -233,162 +233,118 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { self.read_vlq()? as _ }; - Ok((element_type, element_count)) - } - - pub(crate) fn read_struct_begin(&mut self) -> Result<()> { - self.read_field_id_stack.push(self.last_read_field_id); - self.last_read_field_id = 0; - Ok(()) - } - - pub(crate) fn read_struct_end(&mut self) -> Result<()> { - self.last_read_field_id = self - .read_field_id_stack - .pop() - .expect("should have previous field ids"); - Ok(()) - } - - // This is a specialized version of read_field_begin, solely for use in parsing - // PageLocation structs in the offset index. This function assumes that the delta - // field will always be less than 0xf, fields will be in order, and no boolean fields - // will be read. This also skips validation of the field type. - // - // Returns a tuple of (field_type, field_delta) - pub(crate) fn read_field_header(&mut self) -> Result<(u8, u8)> { - let field_type = self.read_byte()?; - let field_delta = (field_type & 0xf0) >> 4; - let field_type = field_type & 0xf; - Ok((field_type, field_delta)) + Ok(ListIdentifier { + element_type, + size: element_count, + }) } - pub(crate) fn read_field_begin(&mut self) -> Result<FieldIdentifier> { + /// Read the [`FieldIdentifier`] for a field in a Thrift encoded struct. + fn read_field_begin(&mut self, last_field_id: i16) -> Result<FieldIdentifier> { // we can read at least one byte, which is: // - the type // - the field delta and the type let field_type = self.read_byte()?; let field_delta = (field_type & 0xf0) >> 4; let field_type = FieldType::try_from(field_type & 0xf)?; + let mut bool_val: Option<bool> = None; match field_type { FieldType::Stop => Ok(FieldIdentifier { field_type: FieldType::Stop, id: 0, + bool_val, }), _ => { // special handling for bools if field_type == FieldType::BooleanFalse { - self.pending_read_bool_value = Some(false); + bool_val = Some(false); } else if field_type == FieldType::BooleanTrue { - self.pending_read_bool_value = Some(true); + bool_val = Some(true); } - if field_delta != 0 { - self.last_read_field_id = self - .last_read_field_id - .checked_add(field_delta as i16) - .map_or_else( - || { - Err(general_err!(format!( - "cannot add {} to {}", - field_delta, self.last_read_field_id - ))) - }, - Ok, - )?; + let field_id = if field_delta != 0 { + last_field_id.checked_add(field_delta as i16).map_or_else( + || { + Err(general_err!(format!( + "cannot add {} to {}", + field_delta, last_field_id + ))) + }, + Ok, + )? } else { - self.last_read_field_id = self.read_i16()?; + self.read_i16()? }; Ok(FieldIdentifier { field_type, - id: self.last_read_field_id, + id: field_id, + bool_val, }) } } } - pub(crate) fn read_bool(&mut self) -> Result<bool> { - match self.pending_read_bool_value.take() { - Some(b) => Ok(b), - None => { - let b = self.read_byte()?; - // Previous versions of the thrift specification said to use 0 and 1 inside collections, - // but that differed from existing implementations. - // The specification was updated in https://github.com/apache/thrift/commit/2c29c5665bc442e703480bb0ee60fe925ffe02e8. - // At least the go implementation seems to have followed the previously documented values. - match b { - 0x01 => Ok(true), - 0x00 | 0x02 => Ok(false), - unkn => Err(general_err!(format!("cannot convert {unkn} into bool"))), - } - } - } + /// This is a specialized version of [`Self::read_field_begin`], solely for use in parsing + /// simple structs. This function assumes that the delta field will always be less than 0xf, + /// fields will be in order, and no boolean fields will be read. + /// This also skips validation of the field type. + /// + /// Returns a tuple of `(field_type, field_delta)`. + fn read_field_header(&mut self) -> Result<(u8, u8)> { + let field_type = self.read_byte()?; + let field_delta = (field_type & 0xf0) >> 4; + let field_type = field_type & 0xf; + Ok((field_type, field_delta)) } - pub(crate) fn read_bytes(&mut self) -> Result<&'b [u8]> { - let len = self.read_vlq()? as usize; - let ret = self.buf.get(..len).ok_or_else(eof_error)?; - self.buf = &self.buf[len..]; - Ok(ret) + /// Read a boolean list element. This should not be used for struct fields. For the latter, + /// use the [`FieldIdentifier::bool_val`] field. + fn read_bool(&mut self) -> Result<bool> { + let b = self.read_byte()?; + // Previous versions of the thrift specification said to use 0 and 1 inside collections, + // but that differed from existing implementations. + // The specification was updated in https://github.com/apache/thrift/commit/2c29c5665bc442e703480bb0ee60fe925ffe02e8. + // At least the go implementation seems to have followed the previously documented values. + match b { + 0x01 => Ok(true), + 0x00 | 0x02 => Ok(false), + unkn => Err(general_err!(format!("cannot convert {unkn} into bool"))), + } } - pub(crate) fn read_string(&mut self) -> Result<&'b str> { + /// Read a Thrift [binary] as a UTF-8 encoded string. + /// + /// [binary]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#binary-encoding + fn read_string(&mut self) -> Result<&'a str> { let slice = self.read_bytes()?; Ok(std::str::from_utf8(slice)?) } - pub(crate) fn read_i8(&mut self) -> Result<i8> { + /// Read an `i8`. + fn read_i8(&mut self) -> Result<i8> { Ok(self.read_byte()? as _) } - pub(crate) fn read_i16(&mut self) -> Result<i16> { + /// Read an `i16`. + fn read_i16(&mut self) -> Result<i16> { Ok(self.read_zig_zag()? as _) } - pub(crate) fn read_i32(&mut self) -> Result<i32> { + /// Read an `i32`. + fn read_i32(&mut self) -> Result<i32> { Ok(self.read_zig_zag()? as _) } - pub(crate) fn read_i64(&mut self) -> Result<i64> { + /// Read an `i64`. + fn read_i64(&mut self) -> Result<i64> { self.read_zig_zag() } - pub(crate) fn read_double(&mut self) -> Result<f64> { - let slice = self.buf.get(..8).ok_or_else(eof_error)?; - self.buf = &self.buf[8..]; - match slice.try_into() { - Ok(slice) => Ok(f64::from_le_bytes(slice)), - Err(_) => Err(general_err!("Unexpected error converting slice")), - } - } - - pub(crate) fn read_list_begin(&mut self) -> Result<ListIdentifier> { - let (element_type, element_count) = self.read_list_set_begin()?; - Ok(ListIdentifier { - element_type, - size: element_count, - }) - } - - pub(crate) fn read_list_end(&mut self) -> Result<()> { - Ok(()) - } - - #[inline] - fn read_byte(&mut self) -> Result<u8> { - let ret = *self.buf.first().ok_or_else(eof_error)?; - self.buf = &self.buf[1..]; - Ok(ret) - } - - #[inline] - fn skip_bytes(&mut self, n: usize) -> Result<()> { - self.buf.get(..n).ok_or_else(eof_error)?; - self.buf = &self.buf[n..]; - Ok(()) - } + /// Read a Thrift `double` as `f64`. + fn read_double(&mut self) -> Result<f64>; + /// Skip a ULEB128 encoded varint. fn skip_vlq(&mut self) -> Result<()> { loop { let byte = self.read_byte()?; @@ -398,21 +354,25 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { } } + /// Skip a thrift [binary]. + /// + /// [binary]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#binary-encoding fn skip_binary(&mut self) -> Result<()> { let len = self.read_vlq()? as usize; self.skip_bytes(len) } /// Skip a field with type `field_type` recursively until the default - /// maximum skip depth is reached. - pub(crate) fn skip(&mut self, field_type: FieldType) -> Result<()> { - // TODO: magic number - self.skip_till_depth(field_type, 64) + /// maximum skip depth (currently 64) is reached. + fn skip(&mut self, field_type: FieldType) -> Result<()> { + const DEFAULT_SKIP_DEPTH: i8 = 64; + self.skip_till_depth(field_type, DEFAULT_SKIP_DEPTH) } /// Empty structs in unions consist of a single byte of 0 for the field stop record. - /// This skips that byte without pushing to the field id stack. - pub(crate) fn skip_empty_struct(&mut self) -> Result<()> { + /// This skips that byte without encuring the cost of processing the [`FieldIdentifier`]. + /// Will return an error if the struct is not actually empty. + fn skip_empty_struct(&mut self) -> Result<()> { let b = self.read_byte()?; if b != 0 { Err(general_err!("Empty struct has fields")) @@ -428,7 +388,8 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { } match field_type { - FieldType::BooleanFalse | FieldType::BooleanTrue => self.read_bool().map(|_| ()), + // boolean field has no data + FieldType::BooleanFalse | FieldType::BooleanTrue => Ok(()), FieldType::Byte => self.read_i8().map(|_| ()), FieldType::I16 => self.skip_vlq().map(|_| ()), FieldType::I32 => self.skip_vlq().map(|_| ()), @@ -436,15 +397,16 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { FieldType::Double => self.skip_bytes(8).map(|_| ()), FieldType::Binary => self.skip_binary().map(|_| ()), FieldType::Struct => { - self.read_struct_begin()?; + let mut last_field_id = 0i16; loop { - let field_ident = self.read_field_begin()?; + let field_ident = self.read_field_begin(last_field_id)?; if field_ident.field_type == FieldType::Stop { break; } self.skip_till_depth(field_ident.field_type, depth - 1)?; + last_field_id = field_ident.id; } - self.read_struct_end() + Ok(()) } FieldType::List => { let list_ident = self.read_list_begin()?; @@ -452,7 +414,7 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { let element_type = FieldType::try_from(list_ident.element_type)?; self.skip_till_depth(element_type, depth - 1)?; } - self.read_list_end() + Ok(()) } // no list or map types in parquet format u => Err(general_err!(format!("cannot skip field type {:?}", &u))), @@ -460,90 +422,142 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { } } +/// A high performance Thrift reader that reads from a slice of bytes. +pub(crate) struct ThriftSliceInputProtocol<'a> { + buf: &'a [u8], +} + +impl<'a> ThriftSliceInputProtocol<'a> { + /// Create a new `ThriftSliceInputProtocol` using the bytes in `buf`. + pub fn new(buf: &'a [u8]) -> Self { + Self { buf } + } + + /// Re-initialize this reader with a new slice. + pub fn reset_buffer(&mut self, buf: &'a [u8]) { + self.buf = buf; + } + + /// Return the current buffer as a slice. + pub fn as_slice(&self) -> &'a [u8] { + self.buf + } +} + +impl<'b, 'a: 'b> ThriftCompactInputProtocol<'b> for ThriftSliceInputProtocol<'a> { + #[inline] + fn read_byte(&mut self) -> Result<u8> { + let ret = *self.buf.first().ok_or_else(eof_error)?; + self.buf = &self.buf[1..]; + Ok(ret) + } + + fn read_bytes(&mut self) -> Result<&'b [u8]> { + let len = self.read_vlq()? as usize; + let ret = self.buf.get(..len).ok_or_else(eof_error)?; + self.buf = &self.buf[len..]; + Ok(ret) + } + + #[inline] + fn skip_bytes(&mut self, n: usize) -> Result<()> { + self.buf.get(..n).ok_or_else(eof_error)?; + self.buf = &self.buf[n..]; + Ok(()) + } + + fn read_double(&mut self) -> Result<f64> { + let slice = self.buf.get(..8).ok_or_else(eof_error)?; + self.buf = &self.buf[8..]; + match slice.try_into() { + Ok(slice) => Ok(f64::from_le_bytes(slice)), + Err(_) => Err(general_err!("Unexpected error converting slice")), + } + } +} + fn eof_error() -> ParquetError { eof_err!("Unexpected EOF") } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for bool { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +/// Trait implemented for objects that can be deserialized from a Thrift input stream. +/// Implementations are provided for Thrift primitive types. +pub(crate) trait ReadThrift<'a, R: ThriftCompactInputProtocol<'a>> { + /// Read an object of type `Self` from the input protocol object. + fn read_thrift(prot: &mut R) -> Result<Self> + where + Self: Sized; +} + +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for bool { + fn read_thrift(prot: &mut R) -> Result<Self> { prot.read_bool() } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i8 { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for i8 { + fn read_thrift(prot: &mut R) -> Result<Self> { prot.read_i8() } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i16 { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for i16 { + fn read_thrift(prot: &mut R) -> Result<Self> { prot.read_i16() } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i32 { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for i32 { + fn read_thrift(prot: &mut R) -> Result<Self> { prot.read_i32() } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i64 { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for i64 { + fn read_thrift(prot: &mut R) -> Result<Self> { prot.read_i64() } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for OrderedF64 { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for OrderedF64 { + fn read_thrift(prot: &mut R) -> Result<Self> { Ok(OrderedF64(prot.read_double()?)) } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for &'a str { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for &'a str { + fn read_thrift(prot: &mut R) -> Result<Self> { prot.read_string() } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for String { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for String { + fn read_thrift(prot: &mut R) -> Result<Self> { Ok(prot.read_string()?.to_owned()) } } -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for &'a [u8] { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { +impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for &'a [u8] { + fn read_thrift(prot: &mut R) -> Result<Self> { prot.read_bytes() } } -impl<'a, T> TryFrom<&mut ThriftCompactInputProtocol<'a>> for Vec<T> +/// Read a Thrift encoded [list] from the input protocol object. +/// +/// [list]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#list-and-set +pub(crate) fn read_thrift_vec<'a, T, R>(prot: &mut R) -> Result<Vec<T>> where - T: for<'b> TryFrom<&'b mut ThriftCompactInputProtocol<'a>>, - ParquetError: for<'b> From<<T as TryFrom<&'b mut ThriftCompactInputProtocol<'a>>>::Error>, + R: ThriftCompactInputProtocol<'a>, + T: ReadThrift<'a, R>, { - type Error = ParquetError; - - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self, Self::Error> { - let list_ident = prot.read_list_begin()?; - let mut res = Vec::with_capacity(list_ident.size as usize); - for _ in 0..list_ident.size { - let val = T::try_from(prot)?; - res.push(val); - } - - Ok(res) + let list_ident = prot.read_list_begin()?; + let mut res = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let val = T::read_thrift(prot)?; + res.push(val); } + Ok(res) } ///////////////////////// @@ -983,11 +997,7 @@ pub(crate) mod tests { pub(crate) fn test_roundtrip<T>(val: T) where - T: for<'a> TryFrom<&'a mut ThriftCompactInputProtocol<'a>> - + WriteThrift - + PartialEq - + Debug, - for<'a> <T as TryFrom<&'a mut ThriftCompactInputProtocol<'a>>>::Error: Debug, + T: for<'a> ReadThrift<'a, ThriftSliceInputProtocol<'a>> + WriteThrift + PartialEq + Debug, { let buf = Vec::<u8>::new(); let mut writer = ThriftCompactOutputProtocol::new(buf); @@ -995,8 +1005,8 @@ pub(crate) mod tests { //println!("serialized: {:x?}", writer.inner()); - let mut prot = ThriftCompactInputProtocol::new(writer.inner()); - let read_val = T::try_from(&mut prot).unwrap(); + let mut prot = ThriftSliceInputProtocol::new(writer.inner()); + let read_val = T::read_thrift(&mut prot).unwrap(); assert_eq!(val, read_val); }