etseidl commented on code in PR #8574:
URL: https://github.com/apache/arrow-rs/pull/8574#discussion_r2420208466
##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -837,62 +697,575 @@ fn get_file_decryptor(
}
}
-/// Create ParquetMetaData from thrift input. Note that this only decodes the
file metadata in
-/// the Parquet footer. Page indexes will need to be added later.
-impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for
ParquetMetaData {
- fn read_thrift(prot: &mut R) -> Result<Self> {
- let file_meta = FileMetaData::read_thrift(prot)?;
-
- let version = file_meta.version;
- let num_rows = file_meta.num_rows;
- let row_groups = file_meta.row_groups;
- let created_by = file_meta.created_by.map(|c| c.to_owned());
- let key_value_metadata = file_meta.key_value_metadata;
-
- let val = parquet_schema_from_array(file_meta.schema)?;
- let schema_descr = Arc::new(SchemaDescriptor::new(val));
-
- // need schema_descr to get final RowGroupMetaData
- let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;
-
- // need to map read column orders to actual values based on the schema
- if file_meta
- .column_orders
- .as_ref()
- .is_some_and(|cos| cos.len() != schema_descr.num_columns())
- {
- return Err(general_err!("Column order length mismatch"));
+// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait
because
+// these are all internal and operate on slices.
+fn read_column_chunk<'a>(
+ prot: &mut ThriftSliceInputProtocol<'a>,
+ column_descr: &Arc<ColumnDescriptor>,
+) -> Result<ColumnChunkMetaData> {
+ // ColumnChunk fields
+ let mut file_path: Option<&str> = None;
+ let mut file_offset: Option<i64> = None;
+ let mut offset_index_offset: Option<i64> = None;
+ let mut offset_index_length: Option<i32> = None;
+ let mut column_index_offset: Option<i64> = None;
+ let mut column_index_length: Option<i32> = None;
+ #[cfg(feature = "encryption")]
+ let mut column_crypto_metadata: Option<Box<ColumnCryptoMetaData>> = None;
+ #[cfg(feature = "encryption")]
+ let mut encrypted_column_metadata: Option<&[u8]> = None;
+
+ // ColumnMetaData
+ let mut encodings: Option<Vec<Encoding>> = None;
+ let mut codec: Option<Compression> = None;
+ let mut num_values: Option<i64> = None;
+ let mut total_uncompressed_size: Option<i64> = None;
+ let mut total_compressed_size: Option<i64> = None;
+ let mut data_page_offset: Option<i64> = None;
+ let mut index_page_offset: Option<i64> = None;
+ let mut dictionary_page_offset: Option<i64> = None;
+ let mut statistics: Option<Statistics> = None;
+ let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
+ let mut bloom_filter_offset: Option<i64> = None;
+ let mut bloom_filter_length: Option<i32> = None;
+ let mut size_statistics: Option<SizeStatistics> = None;
+ let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+
+ // struct ColumnChunk {
+ // 1: optional string file_path
+ // 2: required i64 file_offset = 0
+ // 3: optional ColumnMetaData meta_data
+ // 4: optional i64 offset_index_offset
+ // 5: optional i32 offset_index_length
+ // 6: optional i64 column_index_offset
+ // 7: optional i32 column_index_length
+ // 8: optional ColumnCryptoMetaData crypto_metadata
+ // 9: optional binary encrypted_column_metadata
+ // }
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
}
-
- let column_orders = file_meta.column_orders.map(|cos| {
- let mut res = Vec::with_capacity(cos.len());
- for (i, column) in schema_descr.columns().iter().enumerate() {
- match cos[i] {
- ColumnOrder::TYPE_DEFINED_ORDER(_) => {
- let sort_order = ColumnOrder::get_sort_order(
- column.logical_type(),
- column.converted_type(),
- column.physical_type(),
- );
- res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
+ match field_ident.id {
+ 1 => {
+ file_path = Some(<&str>::read_thrift(&mut *prot)?);
+ }
+ 2 => {
+ file_offset = Some(i64::read_thrift(&mut *prot)?);
+ }
+ 3 => {
+ // `ColumnMetaData`. Read inline for performance sake.
+ // struct ColumnMetaData {
+ // 1: required Type type
+ // 2: required list<Encoding> encodings
+ // 3: required list<string> path_in_schema
+ // 4: required CompressionCodec codec
+ // 5: required i64 num_values
+ // 6: required i64 total_uncompressed_size
+ // 7: required i64 total_compressed_size
+ // 8: optional list<KeyValue> key_value_metadata
+ // 9: required i64 data_page_offset
+ // 10: optional i64 index_page_offset
+ // 11: optional i64 dictionary_page_offset
+ // 12: optional Statistics statistics;
+ // 13: optional list<PageEncodingStats> encoding_stats;
+ // 14: optional i64 bloom_filter_offset;
+ // 15: optional i32 bloom_filter_length;
+ // 16: optional SizeStatistics size_statistics;
+ // 17: optional GeospatialStatistics geospatial_statistics;
+ // }
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
}
- _ => res.push(cos[i]),
+ match field_ident.id {
+ // 1: type is never used, we can use the column
descriptor
+ 2 => {
+ let val =
+ read_thrift_vec::<Encoding,
ThriftSliceInputProtocol>(&mut *prot)?;
+ encodings = Some(val);
+ }
+ // 3: path_in_schema is redundant
+ 4 => {
+ codec = Some(Compression::read_thrift(&mut
*prot)?);
+ }
+ 5 => {
+ num_values = Some(i64::read_thrift(&mut *prot)?);
+ }
+ 6 => {
+ total_uncompressed_size =
Some(i64::read_thrift(&mut *prot)?);
+ }
+ 7 => {
+ total_compressed_size = Some(i64::read_thrift(&mut
*prot)?);
+ }
+ // 8: we don't expose this key value
+ 9 => {
+ data_page_offset = Some(i64::read_thrift(&mut
*prot)?);
+ }
+ 10 => {
+ index_page_offset = Some(i64::read_thrift(&mut
*prot)?);
+ }
+ 11 => {
+ dictionary_page_offset =
Some(i64::read_thrift(&mut *prot)?);
+ }
+ 12 => {
+ statistics = Some(Statistics::read_thrift(&mut
*prot)?);
+ }
+ 13 => {
+ let val = read_thrift_vec::<PageEncodingStats,
ThriftSliceInputProtocol>(
+ &mut *prot,
+ )?;
+ encoding_stats = Some(val);
+ }
+ 14 => {
+ bloom_filter_offset = Some(i64::read_thrift(&mut
*prot)?);
+ }
+ 15 => {
+ bloom_filter_length = Some(i32::read_thrift(&mut
*prot)?);
+ }
+ 16 => {
+ let val = SizeStatistics::read_thrift(&mut *prot)?;
+ size_statistics = Some(val);
+ }
+ 17 => {
+ let val = GeospatialStatistics::read_thrift(&mut
*prot)?;
+ geospatial_statistics = Some(val);
+ }
+ _ => {
+ prot.skip(field_ident.field_type)?;
+ }
+ };
+ last_field_id = field_ident.id;
}
}
- res
- });
+ 4 => {
+ offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
+ }
+ 5 => {
+ offset_index_length = Some(i32::read_thrift(&mut *prot)?);
+ }
+ 6 => {
+ column_index_offset = Some(i64::read_thrift(&mut *prot)?);
+ }
+ 7 => {
+ column_index_length = Some(i32::read_thrift(&mut *prot)?);
+ }
+ #[cfg(feature = "encryption")]
+ 8 => {
+ let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
+ column_crypto_metadata = Some(Box::new(val));
+ }
+ #[cfg(feature = "encryption")]
+ 9 => {
+ encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut
*prot)?);
+ }
+ _ => {
+ prot.skip(field_ident.field_type)?;
+ }
+ };
+ last_field_id = field_ident.id;
+ }
- let fmd = crate::file::metadata::FileMetaData::new(
- version,
- num_rows,
- created_by,
- key_value_metadata,
- schema_descr,
- column_orders,
- );
+ // the only required field from ColumnChunk
+ let Some(file_offset) = file_offset else {
+ return Err(general_err!("Required field file_offset is missing"));
+ };
+
+ // transform optional fields
+ let file_path = file_path.map(|f| f.to_owned());
+ let (unencoded_byte_array_data_bytes, repetition_level_histogram,
definition_level_histogram) =
+ if let Some(size_stats) = size_statistics {
+ (
+ size_stats.unencoded_byte_array_data_bytes,
+ size_stats.repetition_level_histogram,
+ size_stats.definition_level_histogram,
+ )
+ } else {
+ (None, None, None)
+ };
+
+ let repetition_level_histogram =
repetition_level_histogram.map(LevelHistogram::from);
+ let definition_level_histogram =
definition_level_histogram.map(LevelHistogram::from);
+
+ let statistics = convert_stats(column_descr.physical_type(), statistics)?;
+ let geo_statistics = convert_geo_stats(geospatial_statistics);
+ let column_descr = column_descr.clone();
+
+ // if encrypted, set the encrypted column metadata and return. we'll
decrypt after finishing
+ // the footer and populate the rest.
+ #[cfg(feature = "encryption")]
+ if encrypted_column_metadata.is_some() {
+ use crate::file::metadata::ColumnChunkMetaDataBuilder;
+
+ let encrypted_column_metadata = encrypted_column_metadata.map(|s|
s.to_vec());
+
+ // use builder to get uninitialized ColumnChunkMetaData
+ let mut col = ColumnChunkMetaDataBuilder::new(column_descr).build()?;
+
+ // set ColumnChunk fields
+ col.file_path = file_path;
+ col.file_offset = file_offset;
+ col.offset_index_offset = offset_index_offset;
+ col.offset_index_length = offset_index_length;
+ col.column_index_offset = column_index_offset;
+ col.column_index_length = column_index_length;
+ col.column_crypto_metadata = column_crypto_metadata;
+ col.encrypted_column_metadata = encrypted_column_metadata;
+
+ // check for ColumnMetaData fields that might be present
+ // first required fields
+ if let Some(encodings) = encodings {
+ col.encodings = encodings;
+ }
+ if let Some(codec) = codec {
+ col.compression = codec;
+ }
+ if let Some(num_values) = num_values {
+ col.num_values = num_values;
+ }
+ if let Some(total_uncompressed_size) = total_uncompressed_size {
+ col.total_uncompressed_size = total_uncompressed_size;
+ }
+ if let Some(total_compressed_size) = total_compressed_size {
+ col.total_compressed_size = total_compressed_size;
+ }
+ if let Some(data_page_offset) = data_page_offset {
+ col.data_page_offset = data_page_offset;
+ }
+
+ // then optional
+ col.index_page_offset = index_page_offset;
+ col.dictionary_page_offset = dictionary_page_offset;
+ col.bloom_filter_offset = bloom_filter_offset;
+ col.bloom_filter_length = bloom_filter_length;
+ col.unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes;
+ col.repetition_level_histogram = repetition_level_histogram;
+ col.definition_level_histogram = definition_level_histogram;
+ col.encoding_stats = encoding_stats;
+ col.statistics = statistics;
+ col.geo_statistics = geo_statistics;
+
+ return Ok(col);
+ }
+
+ // not encrypted, so meta_data better exist
+ let Some(encodings) = encodings else {
+ return Err(ParquetError::General(
+ "Required field encodings is missing".to_owned(),
+ ));
+ };
+ let Some(codec) = codec else {
+ return Err(ParquetError::General(
+ "Required field codec is missing".to_owned(),
+ ));
+ };
+ let Some(num_values) = num_values else {
+ return Err(ParquetError::General(
+ "Required field num_values is missing".to_owned(),
+ ));
+ };
+ let Some(total_uncompressed_size) = total_uncompressed_size else {
+ return Err(ParquetError::General(
+ "Required field total_uncompressed_size is missing".to_owned(),
+ ));
+ };
+ let Some(total_compressed_size) = total_compressed_size else {
+ return Err(ParquetError::General(
+ "Required field total_compressed_size is missing".to_owned(),
+ ));
+ };
+ let Some(data_page_offset) = data_page_offset else {
+ return Err(ParquetError::General(
+ "Required field data_page_offset is missing".to_owned(),
+ ));
+ };
+
+ let compression = codec;
+
+ // NOTE: I tried using the builder for this, but it added 20% to the
execution time
Review Comment:
Spoiler: I eventually take another page from @jhorstmann's book and use the
builder to create a default initialized `ColumnChunkMetaData` whose fields I
then fill in directly. It reduces the code bloat and allows me to again split
out the `ColumnMetaData` parsing without any drop in performance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]