alamb commented on code in PR #8574:
URL: https://github.com/apache/arrow-rs/pull/8574#discussion_r2420100476


##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -837,62 +697,575 @@ fn get_file_decryptor(
     }
 }
 
-/// Create ParquetMetaData from thrift input. Note that this only decodes the 
file metadata in
-/// the Parquet footer. Page indexes will need to be added later.
-impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for 
ParquetMetaData {
-    fn read_thrift(prot: &mut R) -> Result<Self> {
-        let file_meta = FileMetaData::read_thrift(prot)?;
-
-        let version = file_meta.version;
-        let num_rows = file_meta.num_rows;
-        let row_groups = file_meta.row_groups;
-        let created_by = file_meta.created_by.map(|c| c.to_owned());
-        let key_value_metadata = file_meta.key_value_metadata;
-
-        let val = parquet_schema_from_array(file_meta.schema)?;
-        let schema_descr = Arc::new(SchemaDescriptor::new(val));
-
-        // need schema_descr to get final RowGroupMetaData
-        let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;
-
-        // need to map read column orders to actual values based on the schema
-        if file_meta
-            .column_orders
-            .as_ref()
-            .is_some_and(|cos| cos.len() != schema_descr.num_columns())
-        {
-            return Err(general_err!("Column order length mismatch"));
+// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait 
because
+// these are all internal and operate on slices.
+fn read_column_chunk<'a>(
+    prot: &mut ThriftSliceInputProtocol<'a>,
+    column_descr: &Arc<ColumnDescriptor>,
+) -> Result<ColumnChunkMetaData> {
+    // ColumnChunk fields
+    let mut file_path: Option<&str> = None;
+    let mut file_offset: Option<i64> = None;
+    let mut offset_index_offset: Option<i64> = None;
+    let mut offset_index_length: Option<i32> = None;
+    let mut column_index_offset: Option<i64> = None;
+    let mut column_index_length: Option<i32> = None;
+    #[cfg(feature = "encryption")]
+    let mut column_crypto_metadata: Option<Box<ColumnCryptoMetaData>> = None;
+    #[cfg(feature = "encryption")]
+    let mut encrypted_column_metadata: Option<&[u8]> = None;
+
+    // ColumnMetaData
+    let mut encodings: Option<Vec<Encoding>> = None;
+    let mut codec: Option<Compression> = None;
+    let mut num_values: Option<i64> = None;
+    let mut total_uncompressed_size: Option<i64> = None;
+    let mut total_compressed_size: Option<i64> = None;
+    let mut data_page_offset: Option<i64> = None;
+    let mut index_page_offset: Option<i64> = None;
+    let mut dictionary_page_offset: Option<i64> = None;
+    let mut statistics: Option<Statistics> = None;
+    let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
+    let mut bloom_filter_offset: Option<i64> = None;
+    let mut bloom_filter_length: Option<i32> = None;
+    let mut size_statistics: Option<SizeStatistics> = None;
+    let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+
+    // struct ColumnChunk {
+    //   1: optional string file_path
+    //   2: required i64 file_offset = 0
+    //   3: optional ColumnMetaData meta_data
+    //   4: optional i64 offset_index_offset
+    //   5: optional i32 offset_index_length
+    //   6: optional i64 column_index_offset
+    //   7: optional i32 column_index_length
+    //   8: optional ColumnCryptoMetaData crypto_metadata
+    //   9: optional binary encrypted_column_metadata
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
         }
-
-        let column_orders = file_meta.column_orders.map(|cos| {
-            let mut res = Vec::with_capacity(cos.len());
-            for (i, column) in schema_descr.columns().iter().enumerate() {
-                match cos[i] {
-                    ColumnOrder::TYPE_DEFINED_ORDER(_) => {
-                        let sort_order = ColumnOrder::get_sort_order(
-                            column.logical_type(),
-                            column.converted_type(),
-                            column.physical_type(),
-                        );
-                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
+        match field_ident.id {
+            1 => {
+                file_path = Some(<&str>::read_thrift(&mut *prot)?);
+            }
+            2 => {
+                file_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            3 => {
+                // `ColumnMetaData`. Read inline for performance sake.
+                // struct ColumnMetaData {
+                //   1: required Type type
+                //   2: required list<Encoding> encodings
+                //   3: required list<string> path_in_schema
+                //   4: required CompressionCodec codec
+                //   5: required i64 num_values
+                //   6: required i64 total_uncompressed_size
+                //   7: required i64 total_compressed_size
+                //   8: optional list<KeyValue> key_value_metadata
+                //   9: required i64 data_page_offset
+                //   10: optional i64 index_page_offset
+                //   11: optional i64 dictionary_page_offset
+                //   12: optional Statistics statistics;
+                //   13: optional list<PageEncodingStats> encoding_stats;
+                //   14: optional i64 bloom_filter_offset;
+                //   15: optional i32 bloom_filter_length;
+                //   16: optional SizeStatistics size_statistics;
+                //   17: optional GeospatialStatistics geospatial_statistics;
+                // }
+                let mut last_field_id = 0i16;
+                loop {
+                    let field_ident = prot.read_field_begin(last_field_id)?;
+                    if field_ident.field_type == FieldType::Stop {
+                        break;
                     }
-                    _ => res.push(cos[i]),
+                    match field_ident.id {
+                        // 1: type is never used, we can use the column 
descriptor
+                        2 => {
+                            let val =
+                                read_thrift_vec::<Encoding, 
ThriftSliceInputProtocol>(&mut *prot)?;
+                            encodings = Some(val);
+                        }
+                        // 3: path_in_schema is redundant
+                        4 => {
+                            codec = Some(Compression::read_thrift(&mut 
*prot)?);
+                        }
+                        5 => {
+                            num_values = Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        6 => {
+                            total_uncompressed_size = 
Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        7 => {
+                            total_compressed_size = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        // 8: we don't expose this key value
+                        9 => {
+                            data_page_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        10 => {
+                            index_page_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        11 => {
+                            dictionary_page_offset = 
Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        12 => {
+                            statistics = Some(Statistics::read_thrift(&mut 
*prot)?);
+                        }
+                        13 => {
+                            let val = read_thrift_vec::<PageEncodingStats, 
ThriftSliceInputProtocol>(
+                                &mut *prot,
+                            )?;
+                            encoding_stats = Some(val);
+                        }
+                        14 => {
+                            bloom_filter_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        15 => {
+                            bloom_filter_length = Some(i32::read_thrift(&mut 
*prot)?);
+                        }
+                        16 => {
+                            let val = SizeStatistics::read_thrift(&mut *prot)?;
+                            size_statistics = Some(val);
+                        }
+                        17 => {
+                            let val = GeospatialStatistics::read_thrift(&mut 
*prot)?;
+                            geospatial_statistics = Some(val);
+                        }
+                        _ => {
+                            prot.skip(field_ident.field_type)?;
+                        }
+                    };
+                    last_field_id = field_ident.id;
                 }
             }
-            res
-        });
+            4 => {
+                offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            5 => {
+                offset_index_length = Some(i32::read_thrift(&mut *prot)?);
+            }
+            6 => {
+                column_index_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            7 => {
+                column_index_length = Some(i32::read_thrift(&mut *prot)?);
+            }
+            #[cfg(feature = "encryption")]
+            8 => {
+                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
+                column_crypto_metadata = Some(Box::new(val));
+            }
+            #[cfg(feature = "encryption")]
+            9 => {
+                encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut 
*prot)?);
+            }
+            _ => {
+                prot.skip(field_ident.field_type)?;
+            }
+        };
+        last_field_id = field_ident.id;
+    }
 
-        let fmd = crate::file::metadata::FileMetaData::new(
-            version,
-            num_rows,
-            created_by,
-            key_value_metadata,
-            schema_descr,
-            column_orders,
-        );
+    // the only required field from ColumnChunk
+    let Some(file_offset) = file_offset else {
+        return Err(general_err!("Required field file_offset is missing"));
+    };
+
+    // transform optional fields
+    let file_path = file_path.map(|f| f.to_owned());
+    let (unencoded_byte_array_data_bytes, repetition_level_histogram, 
definition_level_histogram) =
+        if let Some(size_stats) = size_statistics {
+            (
+                size_stats.unencoded_byte_array_data_bytes,
+                size_stats.repetition_level_histogram,
+                size_stats.definition_level_histogram,
+            )
+        } else {
+            (None, None, None)
+        };
+
+    let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
+    let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
+
+    let statistics = convert_stats(column_descr.physical_type(), statistics)?;
+    let geo_statistics = convert_geo_stats(geospatial_statistics);
+    let column_descr = column_descr.clone();
+
+    // if encrypted, set the encrypted column metadata and return. we'll 
decrypt after finishing
+    // the footer and populate the rest.
+    #[cfg(feature = "encryption")]
+    if encrypted_column_metadata.is_some() {
+        use crate::file::metadata::ColumnChunkMetaDataBuilder;
+
+        let encrypted_column_metadata = encrypted_column_metadata.map(|s| 
s.to_vec());
+
+        // use builder to get uninitialized ColumnChunkMetaData
+        let mut col = ColumnChunkMetaDataBuilder::new(column_descr).build()?;
+
+        // set ColumnChunk fields
+        col.file_path = file_path;
+        col.file_offset = file_offset;
+        col.offset_index_offset = offset_index_offset;
+        col.offset_index_length = offset_index_length;
+        col.column_index_offset = column_index_offset;
+        col.column_index_length = column_index_length;
+        col.column_crypto_metadata = column_crypto_metadata;
+        col.encrypted_column_metadata = encrypted_column_metadata;
+
+        // check for ColumnMetaData fields that might be present
+        // first required fields
+        if let Some(encodings) = encodings {
+            col.encodings = encodings;
+        }
+        if let Some(codec) = codec {
+            col.compression = codec;
+        }
+        if let Some(num_values) = num_values {
+            col.num_values = num_values;
+        }
+        if let Some(total_uncompressed_size) = total_uncompressed_size {
+            col.total_uncompressed_size = total_uncompressed_size;
+        }
+        if let Some(total_compressed_size) = total_compressed_size {
+            col.total_compressed_size = total_compressed_size;
+        }
+        if let Some(data_page_offset) = data_page_offset {
+            col.data_page_offset = data_page_offset;
+        }
+
+        // then optional
+        col.index_page_offset = index_page_offset;
+        col.dictionary_page_offset = dictionary_page_offset;
+        col.bloom_filter_offset = bloom_filter_offset;
+        col.bloom_filter_length = bloom_filter_length;
+        col.unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes;
+        col.repetition_level_histogram = repetition_level_histogram;
+        col.definition_level_histogram = definition_level_histogram;
+        col.encoding_stats = encoding_stats;
+        col.statistics = statistics;
+        col.geo_statistics = geo_statistics;
+
+        return Ok(col);
+    }
+
+    // not encrypted, so meta_data better exist
+    let Some(encodings) = encodings else {
+        return Err(ParquetError::General(
+            "Required field encodings is missing".to_owned(),
+        ));
+    };
+    let Some(codec) = codec else {
+        return Err(ParquetError::General(
+            "Required field codec is missing".to_owned(),
+        ));
+    };
+    let Some(num_values) = num_values else {
+        return Err(ParquetError::General(
+            "Required field num_values is missing".to_owned(),
+        ));
+    };
+    let Some(total_uncompressed_size) = total_uncompressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_uncompressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(total_compressed_size) = total_compressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_compressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(data_page_offset) = data_page_offset else {
+        return Err(ParquetError::General(
+            "Required field data_page_offset is missing".to_owned(),
+        ));
+    };
+
+    let compression = codec;
+
+    // NOTE: I tried using the builder for this, but it added 20% to the 
execution time
+    let result = ColumnChunkMetaData {
+        column_descr,
+        encodings,
+        file_path,
+        file_offset,
+        num_values,
+        compression,
+        total_compressed_size,
+        total_uncompressed_size,
+        data_page_offset,
+        index_page_offset,
+        dictionary_page_offset,
+        statistics,
+        geo_statistics,
+        encoding_stats,
+        bloom_filter_offset,
+        bloom_filter_length,
+        offset_index_offset,
+        offset_index_length,
+        column_index_offset,
+        column_index_length,
+        unencoded_byte_array_data_bytes,
+        repetition_level_histogram,
+        definition_level_histogram,
+        #[cfg(feature = "encryption")]
+        column_crypto_metadata,
+        #[cfg(feature = "encryption")]
+        encrypted_column_metadata: None, // tested is_some above
+    };
+    Ok(result)
+}
+
+fn read_row_group(
+    prot: &mut ThriftSliceInputProtocol,
+    schema_descr: &Arc<SchemaDescriptor>,
+) -> Result<RowGroupMetaData> {
+    let mut columns: Option<Vec<ColumnChunkMetaData>> = None;
+    let mut total_byte_size: Option<i64> = None;
+    let mut num_rows: Option<i64> = None;
+    let mut sorting_columns: Option<Vec<SortingColumn>> = None;
+    let mut file_offset: Option<i64> = None;
+    let mut ordinal: Option<i16> = None;
+
+    // struct RowGroup {
+    //   1: required list<ColumnChunk> columns
+    //   2: required i64 total_byte_size
+    //   3: required i64 num_rows
+    //   4: optional list<SortingColumn> sorting_columns
+    //   5: optional i64 file_offset
+    //   6: optional i64 total_compressed_size
+    //   7: optional i16 ordinal
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
+        }
+        match field_ident.id {
+            1 => {
+                let list_ident = prot.read_list_begin()?;
+                if schema_descr.num_columns() != list_ident.size as usize {
+                    return Err(general_err!(
+                        "Column count mismatch. Schema has {} columns while 
Row Group has {}",
+                        schema_descr.num_columns(),
+                        list_ident.size
+                    ));
+                }
+                let mut cols = Vec::with_capacity(list_ident.size as usize);
+                for i in 0..list_ident.size as usize {
+                    let col = read_column_chunk(prot, 
&schema_descr.columns()[i])?;
+                    cols.push(col);
+                }
+                columns = Some(cols);
+            }
+            2 => {
+                total_byte_size = Some(i64::read_thrift(&mut *prot)?);
+            }
+            3 => {
+                num_rows = Some(i64::read_thrift(&mut *prot)?);
+            }
+            4 => {
+                let val = read_thrift_vec::<SortingColumn, 
ThriftSliceInputProtocol>(&mut *prot)?;
+                sorting_columns = Some(val);
+            }
+            5 => {
+                file_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            // 6: we don't expose total_compressed_size
+            7 => {
+                ordinal = Some(i16::read_thrift(&mut *prot)?);
+            }
+            _ => {
+                prot.skip(field_ident.field_type)?;
+            }
+        };
+        last_field_id = field_ident.id;
+    }
+    let Some(columns) = columns else {
+        return Err(ParquetError::General(
+            "Required field columns is missing".to_owned(),
+        ));
+    };
+    let Some(total_byte_size) = total_byte_size else {
+        return Err(ParquetError::General(
+            "Required field total_byte_size is missing".to_owned(),
+        ));
+    };
+    let Some(num_rows) = num_rows else {
+        return Err(ParquetError::General(
+            "Required field num_rows is missing".to_owned(),
+        ));
+    };
+
+    Ok(RowGroupMetaData {
+        columns,
+        num_rows,
+        sorting_columns,
+        total_byte_size,
+        schema_descr: schema_descr.clone(),
+        file_offset,
+        ordinal,
+    })
+}
 
-        Ok(ParquetMetaData::new(fmd, row_groups))
+/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes 
the file metadata in

Review Comment:
   this is really neat. With this code, I can easily see a bunch more potential 
optimizations (e.g. for example use a single Vec of `ColumnChunkMetadata` and 
have each RowGroup store the starting offset into that vec rather than have 
Vecs of Vecs of Vec. 
   
   (not for this PR obviously, but for a future one)



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -837,62 +697,575 @@ fn get_file_decryptor(
     }
 }
 
-/// Create ParquetMetaData from thrift input. Note that this only decodes the 
file metadata in
-/// the Parquet footer. Page indexes will need to be added later.
-impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for 
ParquetMetaData {
-    fn read_thrift(prot: &mut R) -> Result<Self> {
-        let file_meta = FileMetaData::read_thrift(prot)?;
-
-        let version = file_meta.version;
-        let num_rows = file_meta.num_rows;
-        let row_groups = file_meta.row_groups;
-        let created_by = file_meta.created_by.map(|c| c.to_owned());
-        let key_value_metadata = file_meta.key_value_metadata;
-
-        let val = parquet_schema_from_array(file_meta.schema)?;
-        let schema_descr = Arc::new(SchemaDescriptor::new(val));
-
-        // need schema_descr to get final RowGroupMetaData
-        let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;
-
-        // need to map read column orders to actual values based on the schema
-        if file_meta
-            .column_orders
-            .as_ref()
-            .is_some_and(|cos| cos.len() != schema_descr.num_columns())
-        {
-            return Err(general_err!("Column order length mismatch"));
+// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait 
because
+// these are all internal and operate on slices.
+fn read_column_chunk<'a>(
+    prot: &mut ThriftSliceInputProtocol<'a>,
+    column_descr: &Arc<ColumnDescriptor>,
+) -> Result<ColumnChunkMetaData> {
+    // ColumnChunk fields
+    let mut file_path: Option<&str> = None;
+    let mut file_offset: Option<i64> = None;
+    let mut offset_index_offset: Option<i64> = None;
+    let mut offset_index_length: Option<i32> = None;
+    let mut column_index_offset: Option<i64> = None;
+    let mut column_index_length: Option<i32> = None;
+    #[cfg(feature = "encryption")]
+    let mut column_crypto_metadata: Option<Box<ColumnCryptoMetaData>> = None;
+    #[cfg(feature = "encryption")]
+    let mut encrypted_column_metadata: Option<&[u8]> = None;
+
+    // ColumnMetaData
+    let mut encodings: Option<Vec<Encoding>> = None;
+    let mut codec: Option<Compression> = None;
+    let mut num_values: Option<i64> = None;
+    let mut total_uncompressed_size: Option<i64> = None;
+    let mut total_compressed_size: Option<i64> = None;
+    let mut data_page_offset: Option<i64> = None;
+    let mut index_page_offset: Option<i64> = None;
+    let mut dictionary_page_offset: Option<i64> = None;
+    let mut statistics: Option<Statistics> = None;
+    let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
+    let mut bloom_filter_offset: Option<i64> = None;
+    let mut bloom_filter_length: Option<i32> = None;
+    let mut size_statistics: Option<SizeStatistics> = None;
+    let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+
+    // struct ColumnChunk {
+    //   1: optional string file_path
+    //   2: required i64 file_offset = 0
+    //   3: optional ColumnMetaData meta_data
+    //   4: optional i64 offset_index_offset
+    //   5: optional i32 offset_index_length
+    //   6: optional i64 column_index_offset
+    //   7: optional i32 column_index_length
+    //   8: optional ColumnCryptoMetaData crypto_metadata
+    //   9: optional binary encrypted_column_metadata
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
         }
-
-        let column_orders = file_meta.column_orders.map(|cos| {
-            let mut res = Vec::with_capacity(cos.len());
-            for (i, column) in schema_descr.columns().iter().enumerate() {
-                match cos[i] {
-                    ColumnOrder::TYPE_DEFINED_ORDER(_) => {
-                        let sort_order = ColumnOrder::get_sort_order(
-                            column.logical_type(),
-                            column.converted_type(),
-                            column.physical_type(),
-                        );
-                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
+        match field_ident.id {
+            1 => {
+                file_path = Some(<&str>::read_thrift(&mut *prot)?);
+            }
+            2 => {
+                file_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            3 => {
+                // `ColumnMetaData`. Read inline for performance sake.
+                // struct ColumnMetaData {
+                //   1: required Type type
+                //   2: required list<Encoding> encodings
+                //   3: required list<string> path_in_schema
+                //   4: required CompressionCodec codec
+                //   5: required i64 num_values
+                //   6: required i64 total_uncompressed_size
+                //   7: required i64 total_compressed_size
+                //   8: optional list<KeyValue> key_value_metadata
+                //   9: required i64 data_page_offset
+                //   10: optional i64 index_page_offset
+                //   11: optional i64 dictionary_page_offset
+                //   12: optional Statistics statistics;
+                //   13: optional list<PageEncodingStats> encoding_stats;
+                //   14: optional i64 bloom_filter_offset;
+                //   15: optional i32 bloom_filter_length;
+                //   16: optional SizeStatistics size_statistics;
+                //   17: optional GeospatialStatistics geospatial_statistics;
+                // }
+                let mut last_field_id = 0i16;
+                loop {
+                    let field_ident = prot.read_field_begin(last_field_id)?;
+                    if field_ident.field_type == FieldType::Stop {
+                        break;
                     }
-                    _ => res.push(cos[i]),
+                    match field_ident.id {
+                        // 1: type is never used, we can use the column 
descriptor
+                        2 => {
+                            let val =
+                                read_thrift_vec::<Encoding, 
ThriftSliceInputProtocol>(&mut *prot)?;
+                            encodings = Some(val);
+                        }
+                        // 3: path_in_schema is redundant
+                        4 => {
+                            codec = Some(Compression::read_thrift(&mut 
*prot)?);
+                        }
+                        5 => {
+                            num_values = Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        6 => {
+                            total_uncompressed_size = 
Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        7 => {
+                            total_compressed_size = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        // 8: we don't expose this key value
+                        9 => {
+                            data_page_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        10 => {
+                            index_page_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        11 => {
+                            dictionary_page_offset = 
Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        12 => {
+                            statistics = Some(Statistics::read_thrift(&mut 
*prot)?);
+                        }
+                        13 => {
+                            let val = read_thrift_vec::<PageEncodingStats, 
ThriftSliceInputProtocol>(
+                                &mut *prot,
+                            )?;
+                            encoding_stats = Some(val);
+                        }
+                        14 => {
+                            bloom_filter_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        15 => {
+                            bloom_filter_length = Some(i32::read_thrift(&mut 
*prot)?);
+                        }
+                        16 => {
+                            let val = SizeStatistics::read_thrift(&mut *prot)?;
+                            size_statistics = Some(val);
+                        }
+                        17 => {
+                            let val = GeospatialStatistics::read_thrift(&mut 
*prot)?;
+                            geospatial_statistics = Some(val);
+                        }
+                        _ => {
+                            prot.skip(field_ident.field_type)?;
+                        }
+                    };
+                    last_field_id = field_ident.id;
                 }
             }
-            res
-        });
+            4 => {
+                offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            5 => {
+                offset_index_length = Some(i32::read_thrift(&mut *prot)?);
+            }
+            6 => {
+                column_index_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            7 => {
+                column_index_length = Some(i32::read_thrift(&mut *prot)?);
+            }
+            #[cfg(feature = "encryption")]
+            8 => {
+                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
+                column_crypto_metadata = Some(Box::new(val));
+            }
+            #[cfg(feature = "encryption")]
+            9 => {
+                encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut 
*prot)?);
+            }
+            _ => {
+                prot.skip(field_ident.field_type)?;
+            }
+        };
+        last_field_id = field_ident.id;
+    }
 
-        let fmd = crate::file::metadata::FileMetaData::new(
-            version,
-            num_rows,
-            created_by,
-            key_value_metadata,
-            schema_descr,
-            column_orders,
-        );
+    // the only required field from ColumnChunk
+    let Some(file_offset) = file_offset else {
+        return Err(general_err!("Required field file_offset is missing"));
+    };
+
+    // transform optional fields
+    let file_path = file_path.map(|f| f.to_owned());
+    let (unencoded_byte_array_data_bytes, repetition_level_histogram, 
definition_level_histogram) =
+        if let Some(size_stats) = size_statistics {
+            (
+                size_stats.unencoded_byte_array_data_bytes,
+                size_stats.repetition_level_histogram,
+                size_stats.definition_level_histogram,
+            )
+        } else {
+            (None, None, None)
+        };
+
+    let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
+    let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
+
+    let statistics = convert_stats(column_descr.physical_type(), statistics)?;
+    let geo_statistics = convert_geo_stats(geospatial_statistics);
+    let column_descr = column_descr.clone();
+
+    // if encrypted, set the encrypted column metadata and return. we'll 
decrypt after finishing
+    // the footer and populate the rest.
+    #[cfg(feature = "encryption")]
+    if encrypted_column_metadata.is_some() {
+        use crate::file::metadata::ColumnChunkMetaDataBuilder;
+
+        let encrypted_column_metadata = encrypted_column_metadata.map(|s| 
s.to_vec());
+
+        // use builder to get uninitialized ColumnChunkMetaData
+        let mut col = ColumnChunkMetaDataBuilder::new(column_descr).build()?;
+
+        // set ColumnChunk fields
+        col.file_path = file_path;
+        col.file_offset = file_offset;
+        col.offset_index_offset = offset_index_offset;
+        col.offset_index_length = offset_index_length;
+        col.column_index_offset = column_index_offset;
+        col.column_index_length = column_index_length;
+        col.column_crypto_metadata = column_crypto_metadata;
+        col.encrypted_column_metadata = encrypted_column_metadata;
+
+        // check for ColumnMetaData fields that might be present
+        // first required fields
+        if let Some(encodings) = encodings {
+            col.encodings = encodings;
+        }
+        if let Some(codec) = codec {
+            col.compression = codec;
+        }
+        if let Some(num_values) = num_values {
+            col.num_values = num_values;
+        }
+        if let Some(total_uncompressed_size) = total_uncompressed_size {
+            col.total_uncompressed_size = total_uncompressed_size;
+        }
+        if let Some(total_compressed_size) = total_compressed_size {
+            col.total_compressed_size = total_compressed_size;
+        }
+        if let Some(data_page_offset) = data_page_offset {
+            col.data_page_offset = data_page_offset;
+        }
+
+        // then optional
+        col.index_page_offset = index_page_offset;
+        col.dictionary_page_offset = dictionary_page_offset;
+        col.bloom_filter_offset = bloom_filter_offset;
+        col.bloom_filter_length = bloom_filter_length;
+        col.unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes;
+        col.repetition_level_histogram = repetition_level_histogram;
+        col.definition_level_histogram = definition_level_histogram;
+        col.encoding_stats = encoding_stats;
+        col.statistics = statistics;
+        col.geo_statistics = geo_statistics;
+
+        return Ok(col);
+    }
+
+    // not encrypted, so meta_data better exist
+    let Some(encodings) = encodings else {
+        return Err(ParquetError::General(
+            "Required field encodings is missing".to_owned(),
+        ));
+    };
+    let Some(codec) = codec else {
+        return Err(ParquetError::General(
+            "Required field codec is missing".to_owned(),
+        ));
+    };
+    let Some(num_values) = num_values else {
+        return Err(ParquetError::General(
+            "Required field num_values is missing".to_owned(),
+        ));
+    };
+    let Some(total_uncompressed_size) = total_uncompressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_uncompressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(total_compressed_size) = total_compressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_compressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(data_page_offset) = data_page_offset else {
+        return Err(ParquetError::General(
+            "Required field data_page_offset is missing".to_owned(),
+        ));
+    };
+
+    let compression = codec;
+
+    // NOTE: I tried using the builder for this, but it added 20% to the 
execution time
+    let result = ColumnChunkMetaData {
+        column_descr,
+        encodings,
+        file_path,
+        file_offset,
+        num_values,
+        compression,
+        total_compressed_size,
+        total_uncompressed_size,
+        data_page_offset,
+        index_page_offset,
+        dictionary_page_offset,
+        statistics,
+        geo_statistics,
+        encoding_stats,
+        bloom_filter_offset,
+        bloom_filter_length,
+        offset_index_offset,
+        offset_index_length,
+        column_index_offset,
+        column_index_length,
+        unencoded_byte_array_data_bytes,
+        repetition_level_histogram,
+        definition_level_histogram,
+        #[cfg(feature = "encryption")]
+        column_crypto_metadata,
+        #[cfg(feature = "encryption")]
+        encrypted_column_metadata: None, // tested is_some above
+    };
+    Ok(result)
+}
+
+fn read_row_group(
+    prot: &mut ThriftSliceInputProtocol,
+    schema_descr: &Arc<SchemaDescriptor>,
+) -> Result<RowGroupMetaData> {
+    let mut columns: Option<Vec<ColumnChunkMetaData>> = None;
+    let mut total_byte_size: Option<i64> = None;
+    let mut num_rows: Option<i64> = None;
+    let mut sorting_columns: Option<Vec<SortingColumn>> = None;
+    let mut file_offset: Option<i64> = None;
+    let mut ordinal: Option<i16> = None;
+
+    // struct RowGroup {
+    //   1: required list<ColumnChunk> columns
+    //   2: required i64 total_byte_size
+    //   3: required i64 num_rows
+    //   4: optional list<SortingColumn> sorting_columns
+    //   5: optional i64 file_offset
+    //   6: optional i64 total_compressed_size
+    //   7: optional i16 ordinal
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
+        }
+        match field_ident.id {
+            1 => {
+                let list_ident = prot.read_list_begin()?;
+                if schema_descr.num_columns() != list_ident.size as usize {
+                    return Err(general_err!(
+                        "Column count mismatch. Schema has {} columns while 
Row Group has {}",
+                        schema_descr.num_columns(),
+                        list_ident.size
+                    ));
+                }
+                let mut cols = Vec::with_capacity(list_ident.size as usize);
+                for i in 0..list_ident.size as usize {
+                    let col = read_column_chunk(prot, 
&schema_descr.columns()[i])?;
+                    cols.push(col);
+                }

Review Comment:
   I am just curious -- did you try a more functional form (I never know quite 
what the rust compiler knows how to optimize). 
   
   Sometimes using this type of loop lets the compiler avoid the bounds checks, 
which may or may not help in this case
   
   ```suggestion
                   let cols = schema_descr
                       .columns()
                       .iter()
                       .enumerate()
                       .map(|(i, c)| read_column_chunk(prot, c))
                       .collect::<Result<Vec<_>>>()?;
   ```
   
   (I don't think we should do this as part of this PR, but it might be 
interesting to try in the future)
   
   



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -837,62 +697,575 @@ fn get_file_decryptor(
     }
 }
 
-/// Create ParquetMetaData from thrift input. Note that this only decodes the 
file metadata in
-/// the Parquet footer. Page indexes will need to be added later.
-impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for 
ParquetMetaData {
-    fn read_thrift(prot: &mut R) -> Result<Self> {
-        let file_meta = FileMetaData::read_thrift(prot)?;
-
-        let version = file_meta.version;
-        let num_rows = file_meta.num_rows;
-        let row_groups = file_meta.row_groups;
-        let created_by = file_meta.created_by.map(|c| c.to_owned());
-        let key_value_metadata = file_meta.key_value_metadata;
-
-        let val = parquet_schema_from_array(file_meta.schema)?;
-        let schema_descr = Arc::new(SchemaDescriptor::new(val));
-
-        // need schema_descr to get final RowGroupMetaData
-        let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;
-
-        // need to map read column orders to actual values based on the schema
-        if file_meta
-            .column_orders
-            .as_ref()
-            .is_some_and(|cos| cos.len() != schema_descr.num_columns())
-        {
-            return Err(general_err!("Column order length mismatch"));
+// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait 
because
+// these are all internal and operate on slices.
+fn read_column_chunk<'a>(
+    prot: &mut ThriftSliceInputProtocol<'a>,
+    column_descr: &Arc<ColumnDescriptor>,
+) -> Result<ColumnChunkMetaData> {
+    // ColumnChunk fields
+    let mut file_path: Option<&str> = None;
+    let mut file_offset: Option<i64> = None;
+    let mut offset_index_offset: Option<i64> = None;
+    let mut offset_index_length: Option<i32> = None;
+    let mut column_index_offset: Option<i64> = None;
+    let mut column_index_length: Option<i32> = None;
+    #[cfg(feature = "encryption")]
+    let mut column_crypto_metadata: Option<Box<ColumnCryptoMetaData>> = None;
+    #[cfg(feature = "encryption")]
+    let mut encrypted_column_metadata: Option<&[u8]> = None;
+
+    // ColumnMetaData
+    let mut encodings: Option<Vec<Encoding>> = None;
+    let mut codec: Option<Compression> = None;
+    let mut num_values: Option<i64> = None;
+    let mut total_uncompressed_size: Option<i64> = None;
+    let mut total_compressed_size: Option<i64> = None;
+    let mut data_page_offset: Option<i64> = None;
+    let mut index_page_offset: Option<i64> = None;
+    let mut dictionary_page_offset: Option<i64> = None;
+    let mut statistics: Option<Statistics> = None;
+    let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
+    let mut bloom_filter_offset: Option<i64> = None;
+    let mut bloom_filter_length: Option<i32> = None;
+    let mut size_statistics: Option<SizeStatistics> = None;
+    let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+
+    // struct ColumnChunk {
+    //   1: optional string file_path
+    //   2: required i64 file_offset = 0
+    //   3: optional ColumnMetaData meta_data
+    //   4: optional i64 offset_index_offset
+    //   5: optional i32 offset_index_length
+    //   6: optional i64 column_index_offset
+    //   7: optional i32 column_index_length
+    //   8: optional ColumnCryptoMetaData crypto_metadata
+    //   9: optional binary encrypted_column_metadata
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
         }
-
-        let column_orders = file_meta.column_orders.map(|cos| {
-            let mut res = Vec::with_capacity(cos.len());
-            for (i, column) in schema_descr.columns().iter().enumerate() {
-                match cos[i] {
-                    ColumnOrder::TYPE_DEFINED_ORDER(_) => {
-                        let sort_order = ColumnOrder::get_sort_order(
-                            column.logical_type(),
-                            column.converted_type(),
-                            column.physical_type(),
-                        );
-                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
+        match field_ident.id {
+            1 => {
+                file_path = Some(<&str>::read_thrift(&mut *prot)?);
+            }
+            2 => {
+                file_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            3 => {
+                // `ColumnMetaData`. Read inline for performance sake.

Review Comment:
   what do you mean "for performance sake" -- does that means the rust compiler 
isn't inlining this when using the macro? Maybe we should sprinkle some 
`#[inline]` 🤔 



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -435,6 +284,7 @@ fn convert_bounding_box(
     })
 }
 
+/// Create a [`crate::file::statistics::Statistics`] from a thrift 
[`Statistics`] object.
 pub(crate) fn convert_stats(

Review Comment:
   This is probably a good target to consider reworking eventually as well it 
it shows up in traces



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -837,62 +697,575 @@ fn get_file_decryptor(
     }
 }
 
-/// Create ParquetMetaData from thrift input. Note that this only decodes the 
file metadata in
-/// the Parquet footer. Page indexes will need to be added later.
-impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for 
ParquetMetaData {
-    fn read_thrift(prot: &mut R) -> Result<Self> {
-        let file_meta = FileMetaData::read_thrift(prot)?;
-
-        let version = file_meta.version;
-        let num_rows = file_meta.num_rows;
-        let row_groups = file_meta.row_groups;
-        let created_by = file_meta.created_by.map(|c| c.to_owned());
-        let key_value_metadata = file_meta.key_value_metadata;
-
-        let val = parquet_schema_from_array(file_meta.schema)?;
-        let schema_descr = Arc::new(SchemaDescriptor::new(val));
-
-        // need schema_descr to get final RowGroupMetaData
-        let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;
-
-        // need to map read column orders to actual values based on the schema
-        if file_meta
-            .column_orders
-            .as_ref()
-            .is_some_and(|cos| cos.len() != schema_descr.num_columns())
-        {
-            return Err(general_err!("Column order length mismatch"));
+// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait 
because
+// these are all internal and operate on slices.
+fn read_column_chunk<'a>(
+    prot: &mut ThriftSliceInputProtocol<'a>,
+    column_descr: &Arc<ColumnDescriptor>,
+) -> Result<ColumnChunkMetaData> {
+    // ColumnChunk fields
+    let mut file_path: Option<&str> = None;
+    let mut file_offset: Option<i64> = None;
+    let mut offset_index_offset: Option<i64> = None;
+    let mut offset_index_length: Option<i32> = None;
+    let mut column_index_offset: Option<i64> = None;
+    let mut column_index_length: Option<i32> = None;
+    #[cfg(feature = "encryption")]
+    let mut column_crypto_metadata: Option<Box<ColumnCryptoMetaData>> = None;
+    #[cfg(feature = "encryption")]
+    let mut encrypted_column_metadata: Option<&[u8]> = None;
+
+    // ColumnMetaData
+    let mut encodings: Option<Vec<Encoding>> = None;
+    let mut codec: Option<Compression> = None;
+    let mut num_values: Option<i64> = None;
+    let mut total_uncompressed_size: Option<i64> = None;
+    let mut total_compressed_size: Option<i64> = None;
+    let mut data_page_offset: Option<i64> = None;
+    let mut index_page_offset: Option<i64> = None;
+    let mut dictionary_page_offset: Option<i64> = None;
+    let mut statistics: Option<Statistics> = None;
+    let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
+    let mut bloom_filter_offset: Option<i64> = None;
+    let mut bloom_filter_length: Option<i32> = None;
+    let mut size_statistics: Option<SizeStatistics> = None;
+    let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+
+    // struct ColumnChunk {
+    //   1: optional string file_path
+    //   2: required i64 file_offset = 0
+    //   3: optional ColumnMetaData meta_data
+    //   4: optional i64 offset_index_offset
+    //   5: optional i32 offset_index_length
+    //   6: optional i64 column_index_offset
+    //   7: optional i32 column_index_length
+    //   8: optional ColumnCryptoMetaData crypto_metadata
+    //   9: optional binary encrypted_column_metadata
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
         }
-
-        let column_orders = file_meta.column_orders.map(|cos| {
-            let mut res = Vec::with_capacity(cos.len());
-            for (i, column) in schema_descr.columns().iter().enumerate() {
-                match cos[i] {
-                    ColumnOrder::TYPE_DEFINED_ORDER(_) => {
-                        let sort_order = ColumnOrder::get_sort_order(
-                            column.logical_type(),
-                            column.converted_type(),
-                            column.physical_type(),
-                        );
-                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
+        match field_ident.id {
+            1 => {
+                file_path = Some(<&str>::read_thrift(&mut *prot)?);
+            }
+            2 => {
+                file_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            3 => {
+                // `ColumnMetaData`. Read inline for performance sake.
+                // struct ColumnMetaData {
+                //   1: required Type type
+                //   2: required list<Encoding> encodings
+                //   3: required list<string> path_in_schema
+                //   4: required CompressionCodec codec
+                //   5: required i64 num_values
+                //   6: required i64 total_uncompressed_size
+                //   7: required i64 total_compressed_size
+                //   8: optional list<KeyValue> key_value_metadata
+                //   9: required i64 data_page_offset
+                //   10: optional i64 index_page_offset
+                //   11: optional i64 dictionary_page_offset
+                //   12: optional Statistics statistics;
+                //   13: optional list<PageEncodingStats> encoding_stats;
+                //   14: optional i64 bloom_filter_offset;
+                //   15: optional i32 bloom_filter_length;
+                //   16: optional SizeStatistics size_statistics;
+                //   17: optional GeospatialStatistics geospatial_statistics;
+                // }
+                let mut last_field_id = 0i16;
+                loop {
+                    let field_ident = prot.read_field_begin(last_field_id)?;
+                    if field_ident.field_type == FieldType::Stop {
+                        break;
                     }
-                    _ => res.push(cos[i]),
+                    match field_ident.id {
+                        // 1: type is never used, we can use the column 
descriptor
+                        2 => {
+                            let val =
+                                read_thrift_vec::<Encoding, 
ThriftSliceInputProtocol>(&mut *prot)?;
+                            encodings = Some(val);
+                        }
+                        // 3: path_in_schema is redundant
+                        4 => {
+                            codec = Some(Compression::read_thrift(&mut 
*prot)?);
+                        }
+                        5 => {
+                            num_values = Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        6 => {
+                            total_uncompressed_size = 
Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        7 => {
+                            total_compressed_size = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        // 8: we don't expose this key value
+                        9 => {
+                            data_page_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        10 => {
+                            index_page_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        11 => {
+                            dictionary_page_offset = 
Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        12 => {
+                            statistics = Some(Statistics::read_thrift(&mut 
*prot)?);
+                        }
+                        13 => {
+                            let val = read_thrift_vec::<PageEncodingStats, 
ThriftSliceInputProtocol>(
+                                &mut *prot,
+                            )?;
+                            encoding_stats = Some(val);
+                        }
+                        14 => {
+                            bloom_filter_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        15 => {
+                            bloom_filter_length = Some(i32::read_thrift(&mut 
*prot)?);
+                        }
+                        16 => {
+                            let val = SizeStatistics::read_thrift(&mut *prot)?;
+                            size_statistics = Some(val);
+                        }
+                        17 => {
+                            let val = GeospatialStatistics::read_thrift(&mut 
*prot)?;
+                            geospatial_statistics = Some(val);
+                        }
+                        _ => {
+                            prot.skip(field_ident.field_type)?;
+                        }
+                    };
+                    last_field_id = field_ident.id;
                 }
             }
-            res
-        });
+            4 => {
+                offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            5 => {
+                offset_index_length = Some(i32::read_thrift(&mut *prot)?);
+            }
+            6 => {
+                column_index_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            7 => {
+                column_index_length = Some(i32::read_thrift(&mut *prot)?);
+            }
+            #[cfg(feature = "encryption")]
+            8 => {
+                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
+                column_crypto_metadata = Some(Box::new(val));
+            }
+            #[cfg(feature = "encryption")]
+            9 => {
+                encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut 
*prot)?);
+            }
+            _ => {
+                prot.skip(field_ident.field_type)?;
+            }
+        };
+        last_field_id = field_ident.id;
+    }
 
-        let fmd = crate::file::metadata::FileMetaData::new(
-            version,
-            num_rows,
-            created_by,
-            key_value_metadata,
-            schema_descr,
-            column_orders,
-        );
+    // the only required field from ColumnChunk
+    let Some(file_offset) = file_offset else {
+        return Err(general_err!("Required field file_offset is missing"));
+    };
+
+    // transform optional fields
+    let file_path = file_path.map(|f| f.to_owned());
+    let (unencoded_byte_array_data_bytes, repetition_level_histogram, 
definition_level_histogram) =
+        if let Some(size_stats) = size_statistics {
+            (
+                size_stats.unencoded_byte_array_data_bytes,
+                size_stats.repetition_level_histogram,
+                size_stats.definition_level_histogram,
+            )
+        } else {
+            (None, None, None)
+        };
+
+    let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
+    let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
+
+    let statistics = convert_stats(column_descr.physical_type(), statistics)?;
+    let geo_statistics = convert_geo_stats(geospatial_statistics);
+    let column_descr = column_descr.clone();
+
+    // if encrypted, set the encrypted column metadata and return. we'll 
decrypt after finishing
+    // the footer and populate the rest.
+    #[cfg(feature = "encryption")]
+    if encrypted_column_metadata.is_some() {
+        use crate::file::metadata::ColumnChunkMetaDataBuilder;
+
+        let encrypted_column_metadata = encrypted_column_metadata.map(|s| 
s.to_vec());
+
+        // use builder to get uninitialized ColumnChunkMetaData
+        let mut col = ColumnChunkMetaDataBuilder::new(column_descr).build()?;
+
+        // set ColumnChunk fields
+        col.file_path = file_path;
+        col.file_offset = file_offset;
+        col.offset_index_offset = offset_index_offset;
+        col.offset_index_length = offset_index_length;
+        col.column_index_offset = column_index_offset;
+        col.column_index_length = column_index_length;
+        col.column_crypto_metadata = column_crypto_metadata;
+        col.encrypted_column_metadata = encrypted_column_metadata;
+
+        // check for ColumnMetaData fields that might be present
+        // first required fields
+        if let Some(encodings) = encodings {
+            col.encodings = encodings;
+        }
+        if let Some(codec) = codec {
+            col.compression = codec;
+        }
+        if let Some(num_values) = num_values {
+            col.num_values = num_values;
+        }
+        if let Some(total_uncompressed_size) = total_uncompressed_size {
+            col.total_uncompressed_size = total_uncompressed_size;
+        }
+        if let Some(total_compressed_size) = total_compressed_size {
+            col.total_compressed_size = total_compressed_size;
+        }
+        if let Some(data_page_offset) = data_page_offset {
+            col.data_page_offset = data_page_offset;
+        }
+
+        // then optional
+        col.index_page_offset = index_page_offset;
+        col.dictionary_page_offset = dictionary_page_offset;
+        col.bloom_filter_offset = bloom_filter_offset;
+        col.bloom_filter_length = bloom_filter_length;
+        col.unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes;
+        col.repetition_level_histogram = repetition_level_histogram;
+        col.definition_level_histogram = definition_level_histogram;
+        col.encoding_stats = encoding_stats;
+        col.statistics = statistics;
+        col.geo_statistics = geo_statistics;
+
+        return Ok(col);
+    }
+
+    // not encrypted, so meta_data better exist
+    let Some(encodings) = encodings else {
+        return Err(ParquetError::General(
+            "Required field encodings is missing".to_owned(),
+        ));
+    };
+    let Some(codec) = codec else {
+        return Err(ParquetError::General(
+            "Required field codec is missing".to_owned(),
+        ));
+    };
+    let Some(num_values) = num_values else {
+        return Err(ParquetError::General(
+            "Required field num_values is missing".to_owned(),
+        ));
+    };
+    let Some(total_uncompressed_size) = total_uncompressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_uncompressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(total_compressed_size) = total_compressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_compressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(data_page_offset) = data_page_offset else {
+        return Err(ParquetError::General(
+            "Required field data_page_offset is missing".to_owned(),
+        ));
+    };
+
+    let compression = codec;
+
+    // NOTE: I tried using the builder for this, but it added 20% to the 
execution time

Review Comment:
   that is interesting and unexpected. Sounds like a good idea to keep it like 
this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to