This is an automated email from the ASF dual-hosted git repository.

etseidl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b51a0003ad [thrift-remodel] Remove conversion functions for row group 
and column metadata (#8574)
b51a0003ad is described below

commit b51a0003ad13bc91245e350a9e8d9beaa3c8b35f
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Oct 10 10:24:35 2025 -0700

    [thrift-remodel] Remove conversion functions for row group and column 
metadata (#8574)
    
    # Which issue does this PR close?
    
    - Part of #5853
    - Closes #8517.
    
    # Rationale for this change
    
    A good bit (around 15%) of Parquet metadata parsing involves first
    decoding to thrift structs (`FileMetaData`, `RowGroup`, etc), and then
    converting to the metadata structs used by this crate
    (`ParquetMetaData`, `RowGroupMetaData`, etc). This PR removes some of
    the intermediate structures and parses straight to the crate structs.
    
    # What changes are included in this PR?
    
    Some thrift generated structures are removed, and the code necessary to
    decode has been hand written. This will enable future optimizations such
    as selectively decoding parts of the metadata.
    
    In addition to the above, this PR cleans up some of the memory size
    computation, and also boxes some of the objects used for decryption.
    
    # Are these changes tested?
    
    Should be covered by existing tests.
    
    # Are there any user-facing changes?
    
    No, only private APIs are changed.
---
 parquet/src/file/metadata/memory.rs     |   15 +
 parquet/src/file/metadata/mod.rs        |   58 +-
 parquet/src/file/metadata/parser.rs     |   19 +-
 parquet/src/file/metadata/thrift_gen.rs | 1058 +++++++++++++++++++++----------
 parquet/src/file/metadata/writer.rs     |   29 +-
 parquet/src/file/serialized_reader.rs   |    6 -
 parquet/src/geospatial/bounding_box.rs  |    8 +
 parquet/src/geospatial/statistics.rs    |    8 +-
 parquet/tests/arrow_reader/bad_data.rs  |    6 -
 9 files changed, 827 insertions(+), 380 deletions(-)

diff --git a/parquet/src/file/metadata/memory.rs 
b/parquet/src/file/metadata/memory.rs
index 208e62537b..98ce5736ae 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -56,6 +56,12 @@ impl<T: HeapSize> HeapSize for Arc<T> {
     }
 }
 
+impl<T: HeapSize> HeapSize for Box<T> {
+    fn heap_size(&self) -> usize {
+        std::mem::size_of::<T>() + self.as_ref().heap_size()
+    }
+}
+
 impl<T: HeapSize> HeapSize for Option<T> {
     fn heap_size(&self) -> usize {
         self.as_ref().map(|inner| inner.heap_size()).unwrap_or(0)
@@ -70,10 +76,17 @@ impl HeapSize for String {
 
 impl HeapSize for FileMetaData {
     fn heap_size(&self) -> usize {
+        #[cfg(feature = "encryption")]
+        let encryption_heap_size =
+            self.encryption_algorithm.heap_size() + 
self.footer_signing_key_metadata.heap_size();
+        #[cfg(not(feature = "encryption"))]
+        let encryption_heap_size = 0;
+
         self.created_by.heap_size()
             + self.key_value_metadata.heap_size()
             + self.schema_descr.heap_size()
             + self.column_orders.heap_size()
+            + encryption_heap_size
     }
 }
 
@@ -109,6 +122,7 @@ impl HeapSize for ColumnChunkMetaData {
             + self.unencoded_byte_array_data_bytes.heap_size()
             + self.repetition_level_histogram.heap_size()
             + self.definition_level_histogram.heap_size()
+            + self.geo_statistics.heap_size()
             + encryption_heap_size
     }
 }
@@ -141,6 +155,7 @@ impl HeapSize for PageType {
         0 // no heap allocations
     }
 }
+
 impl HeapSize for Statistics {
     fn heap_size(&self) -> usize {
         match self {
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index b6da97a037..87585f2d71 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -101,6 +101,8 @@ use crate::encryption::decrypt::FileDecryptor;
 #[cfg(feature = "encryption")]
 use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
 pub(crate) use crate::file::metadata::memory::HeapSize;
+#[cfg(feature = "encryption")]
+use crate::file::metadata::thrift_gen::EncryptionAlgorithm;
 use crate::file::page_index::column_index::{ByteArrayColumnIndex, 
PrimitiveColumnIndex};
 use crate::file::page_index::{column_index::ColumnIndexMetaData, 
offset_index::PageLocation};
 use crate::file::statistics::Statistics;
@@ -194,7 +196,7 @@ pub struct ParquetMetaData {
     offset_index: Option<ParquetOffsetIndex>,
     /// Optional file decryptor
     #[cfg(feature = "encryption")]
-    file_decryptor: Option<FileDecryptor>,
+    file_decryptor: Option<Box<FileDecryptor>>,
 }
 
 impl ParquetMetaData {
@@ -215,7 +217,7 @@ impl ParquetMetaData {
     /// encrypted data.
     #[cfg(feature = "encryption")]
     pub(crate) fn with_file_decryptor(&mut self, file_decryptor: 
Option<FileDecryptor>) {
-        self.file_decryptor = file_decryptor;
+        self.file_decryptor = file_decryptor.map(Box::new);
     }
 
     /// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`]
@@ -231,7 +233,7 @@ impl ParquetMetaData {
     /// Returns file decryptor as reference.
     #[cfg(feature = "encryption")]
     pub(crate) fn file_decryptor(&self) -> Option<&FileDecryptor> {
-        self.file_decryptor.as_ref()
+        self.file_decryptor.as_deref()
     }
 
     /// Returns number of row groups in this file.
@@ -411,6 +413,13 @@ impl ParquetMetaDataBuilder {
         self.0.offset_index.as_ref()
     }
 
+    /// Sets the file decryptor needed to decrypt this metadata.
+    #[cfg(feature = "encryption")]
+    pub(crate) fn set_file_decryptor(mut self, file_decryptor: 
Option<FileDecryptor>) -> Self {
+        self.0.with_file_decryptor(file_decryptor);
+        self
+    }
+
     /// Creates a new ParquetMetaData from the builder
     pub fn build(self) -> ParquetMetaData {
         let Self(metadata) = self;
@@ -468,6 +477,10 @@ pub struct FileMetaData {
     key_value_metadata: Option<Vec<KeyValue>>,
     schema_descr: SchemaDescPtr,
     column_orders: Option<Vec<ColumnOrder>>,
+    #[cfg(feature = "encryption")]
+    encryption_algorithm: Option<Box<EncryptionAlgorithm>>,
+    #[cfg(feature = "encryption")]
+    footer_signing_key_metadata: Option<Vec<u8>>,
 }
 
 impl FileMetaData {
@@ -487,9 +500,31 @@ impl FileMetaData {
             key_value_metadata,
             schema_descr,
             column_orders,
+            #[cfg(feature = "encryption")]
+            encryption_algorithm: None,
+            #[cfg(feature = "encryption")]
+            footer_signing_key_metadata: None,
         }
     }
 
+    #[cfg(feature = "encryption")]
+    pub(crate) fn with_encryption_algorithm(
+        mut self,
+        encryption_algorithm: Option<EncryptionAlgorithm>,
+    ) -> Self {
+        self.encryption_algorithm = encryption_algorithm.map(Box::new);
+        self
+    }
+
+    #[cfg(feature = "encryption")]
+    pub(crate) fn with_footer_signing_key_metadata(
+        mut self,
+        footer_signing_key_metadata: Option<Vec<u8>>,
+    ) -> Self {
+        self.footer_signing_key_metadata = footer_signing_key_metadata;
+        self
+    }
+
     /// Returns version of this file.
     pub fn version(&self) -> i32 {
         self.version
@@ -776,7 +811,7 @@ pub struct ColumnChunkMetaData {
     repetition_level_histogram: Option<LevelHistogram>,
     definition_level_histogram: Option<LevelHistogram>,
     #[cfg(feature = "encryption")]
-    column_crypto_metadata: Option<ColumnCryptoMetaData>,
+    column_crypto_metadata: Option<Box<ColumnCryptoMetaData>>,
     #[cfg(feature = "encryption")]
     encrypted_column_metadata: Option<Vec<u8>>,
 }
@@ -1077,7 +1112,7 @@ impl ColumnChunkMetaData {
     /// Returns the encryption metadata for this column chunk.
     #[cfg(feature = "encryption")]
     pub fn crypto_metadata(&self) -> Option<&ColumnCryptoMetaData> {
-        self.column_crypto_metadata.as_ref()
+        self.column_crypto_metadata.as_deref()
     }
 
     /// Converts this [`ColumnChunkMetaData`] into a 
[`ColumnChunkMetaDataBuilder`]
@@ -1283,7 +1318,14 @@ impl ColumnChunkMetaDataBuilder {
     #[cfg(feature = "encryption")]
     /// Set the encryption metadata for an encrypted column
     pub fn set_column_crypto_metadata(mut self, value: 
Option<ColumnCryptoMetaData>) -> Self {
-        self.0.column_crypto_metadata = value;
+        self.0.column_crypto_metadata = value.map(Box::new);
+        self
+    }
+
+    #[cfg(feature = "encryption")]
+    /// Set the encryption metadata for an encrypted column
+    pub fn set_encrypted_column_metadata(mut self, value: Option<Vec<u8>>) -> 
Self {
+        self.0.encrypted_column_metadata = value;
         self
     }
 
@@ -1818,7 +1860,7 @@ mod tests {
         #[cfg(not(feature = "encryption"))]
         let base_expected_size = 2312;
         #[cfg(feature = "encryption")]
-        let base_expected_size = 2744;
+        let base_expected_size = 2480;
 
         assert_eq!(parquet_meta.memory_size(), base_expected_size);
 
@@ -1849,7 +1891,7 @@ mod tests {
         #[cfg(not(feature = "encryption"))]
         let bigger_expected_size = 2738;
         #[cfg(feature = "encryption")]
-        let bigger_expected_size = 3170;
+        let bigger_expected_size = 2906;
 
         // more set fields means more memory usage
         assert!(bigger_expected_size > base_expected_size);
diff --git a/parquet/src/file/metadata/parser.rs 
b/parquet/src/file/metadata/parser.rs
index cbe005d8f9..223ea3484c 100644
--- a/parquet/src/file/metadata/parser.rs
+++ b/parquet/src/file/metadata/parser.rs
@@ -21,12 +21,12 @@
 //! into the corresponding Rust structures
 
 use crate::errors::ParquetError;
+use crate::file::metadata::thrift_gen::parquet_metadata_from_bytes;
 use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, 
ParquetMetaData};
 
 use crate::file::page_index::column_index::ColumnIndexMetaData;
 use crate::file::page_index::index_reader::{decode_column_index, 
decode_offset_index};
 use crate::file::page_index::offset_index::OffsetIndexMetaData;
-use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
 use bytes::Bytes;
 
 /// Helper struct for metadata parsing
@@ -71,11 +71,15 @@ mod inner {
             buf: &[u8],
             encrypted_footer: bool,
         ) -> Result<ParquetMetaData> {
-            
crate::file::metadata::thrift_gen::parquet_metadata_with_encryption(
-                self.file_decryption_properties.as_deref(),
-                encrypted_footer,
-                buf,
-            )
+            if encrypted_footer || self.file_decryption_properties.is_some() {
+                
crate::file::metadata::thrift_gen::parquet_metadata_with_encryption(
+                    self.file_decryption_properties.as_deref(),
+                    encrypted_footer,
+                    buf,
+                )
+            } else {
+                decode_metadata(buf)
+            }
         }
     }
 
@@ -195,8 +199,7 @@ mod inner {
 ///
 /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
 pub(crate) fn decode_metadata(buf: &[u8]) -> 
crate::errors::Result<ParquetMetaData> {
-    let mut prot = ThriftSliceInputProtocol::new(buf);
-    ParquetMetaData::read_thrift(&mut prot)
+    parquet_metadata_from_bytes(buf)
 }
 
 /// Parses column index from the provided bytes and adds it to the metadata.
diff --git a/parquet/src/file/metadata/thrift_gen.rs 
b/parquet/src/file/metadata/thrift_gen.rs
index 336b91428a..a107acfad8 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift_gen.rs
@@ -35,20 +35,20 @@ use crate::{
     },
     parquet_thrift::{
         ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
-        ThriftCompactOutputProtocol, WriteThrift, WriteThriftField, 
read_thrift_vec,
+        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, 
WriteThriftField,
+        read_thrift_vec,
     },
     schema::types::{
         ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, 
parquet_schema_from_array,
     },
-    thrift_struct, thrift_union,
+    thrift_struct,
     util::bit_util::FromBytes,
 };
 #[cfg(feature = "encryption")]
 use crate::{
     encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
-    file::column_crypto_metadata::ColumnCryptoMetaData,
-    parquet_thrift::ThriftSliceInputProtocol,
-    schema::types::SchemaDescPtr,
+    file::{column_crypto_metadata::ColumnCryptoMetaData, metadata::HeapSize},
+    thrift_union,
 };
 
 // this needs to be visible to the schema conversion code
@@ -93,6 +93,21 @@ pub(crate) struct SchemaElement<'a> {
 }
 );
 
+thrift_struct!(
+pub(crate) struct Statistics<'a> {
+   1: optional binary<'a> max;
+   2: optional binary<'a> min;
+   3: optional i64 null_count;
+   4: optional i64 distinct_count;
+   5: optional binary<'a> max_value;
+   6: optional binary<'a> min_value;
+   7: optional bool is_max_value_exact;
+   8: optional bool is_min_value_exact;
+}
+);
+
+// TODO(ets): move a lot of the encryption stuff to its own module
+#[cfg(feature = "encryption")]
 thrift_struct!(
 pub(crate) struct AesGcmV1 {
   /// AAD prefix
@@ -107,6 +122,16 @@ pub(crate) struct AesGcmV1 {
 }
 );
 
+#[cfg(feature = "encryption")]
+impl HeapSize for AesGcmV1 {
+    fn heap_size(&self) -> usize {
+        self.aad_prefix.heap_size()
+            + self.aad_file_unique.heap_size()
+            + self.supply_aad_prefix.heap_size()
+    }
+}
+
+#[cfg(feature = "encryption")]
 thrift_struct!(
 pub(crate) struct AesGcmCtrV1 {
   /// AAD prefix
@@ -121,6 +146,16 @@ pub(crate) struct AesGcmCtrV1 {
 }
 );
 
+#[cfg(feature = "encryption")]
+impl HeapSize for AesGcmCtrV1 {
+    fn heap_size(&self) -> usize {
+        self.aad_prefix.heap_size()
+            + self.aad_file_unique.heap_size()
+            + self.supply_aad_prefix.heap_size()
+    }
+}
+
+#[cfg(feature = "encryption")]
 thrift_union!(
 union EncryptionAlgorithm {
   1: (AesGcmV1) AES_GCM_V1
@@ -128,6 +163,16 @@ union EncryptionAlgorithm {
 }
 );
 
+#[cfg(feature = "encryption")]
+impl HeapSize for EncryptionAlgorithm {
+    fn heap_size(&self) -> usize {
+        match self {
+            Self::AES_GCM_V1(gcm) => gcm.heap_size(),
+            Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
+        }
+    }
+}
+
 #[cfg(feature = "encryption")]
 thrift_struct!(
 /// Crypto metadata for files with encrypted footer
@@ -144,61 +189,9 @@ pub(crate) struct FileCryptoMetaData<'a> {
 );
 
 // the following are only used internally so are private
-thrift_struct!(
-struct FileMetaData<'a> {
-  1: required i32 version
-  2: required list<'a><SchemaElement> schema;
-  3: required i64 num_rows
-  4: required list<'a><RowGroup> row_groups
-  5: optional list<KeyValue> key_value_metadata
-  6: optional string<'a> created_by
-  7: optional list<ColumnOrder> column_orders;
-  8: optional EncryptionAlgorithm encryption_algorithm
-  9: optional binary<'a> footer_signing_key_metadata
-}
-);
-
-thrift_struct!(
-struct RowGroup<'a> {
-  1: required list<'a><ColumnChunk> columns
-  2: required i64 total_byte_size
-  3: required i64 num_rows
-  4: optional list<SortingColumn> sorting_columns
-  5: optional i64 file_offset
-  // we don't expose total_compressed_size so skip
-  //6: optional i64 total_compressed_size
-  7: optional i16 ordinal
-}
-);
-
 #[cfg(feature = "encryption")]
-thrift_struct!(
-struct ColumnChunk<'a> {
-  1: optional string<'a> file_path
-  2: required i64 file_offset = 0
-  3: optional ColumnMetaData<'a> meta_data
-  4: optional i64 offset_index_offset
-  5: optional i32 offset_index_length
-  6: optional i64 column_index_offset
-  7: optional i32 column_index_length
-  8: optional ColumnCryptoMetaData crypto_metadata
-  9: optional binary<'a> encrypted_column_metadata
-}
-);
-#[cfg(not(feature = "encryption"))]
-thrift_struct!(
-struct ColumnChunk<'a> {
-  1: optional string<'a> file_path
-  2: required i64 file_offset = 0
-  3: optional ColumnMetaData<'a> meta_data
-  4: optional i64 offset_index_offset
-  5: optional i32 offset_index_length
-  6: optional i64 column_index_offset
-  7: optional i32 column_index_length
-}
-);
-
 type CompressionCodec = Compression;
+#[cfg(feature = "encryption")]
 thrift_struct!(
 struct ColumnMetaData<'a> {
   1: required Type r#type
@@ -251,150 +244,6 @@ struct SizeStatistics {
 }
 );
 
-thrift_struct!(
-pub(crate) struct Statistics<'a> {
-   1: optional binary<'a> max;
-   2: optional binary<'a> min;
-   3: optional i64 null_count;
-   4: optional i64 distinct_count;
-   5: optional binary<'a> max_value;
-   6: optional binary<'a> min_value;
-   7: optional bool is_max_value_exact;
-   8: optional bool is_min_value_exact;
-}
-);
-
-// convert collection of thrift RowGroups into RowGroupMetaData
-fn convert_row_groups(
-    mut row_groups: Vec<RowGroup>,
-    schema_descr: Arc<SchemaDescriptor>,
-) -> Result<Vec<RowGroupMetaData>> {
-    let mut res: Vec<RowGroupMetaData> = Vec::with_capacity(row_groups.len());
-    for rg in row_groups.drain(0..) {
-        res.push(convert_row_group(rg, schema_descr.clone())?);
-    }
-
-    Ok(res)
-}
-
-fn convert_row_group(
-    row_group: RowGroup,
-    schema_descr: Arc<SchemaDescriptor>,
-) -> Result<RowGroupMetaData> {
-    if schema_descr.num_columns() != row_group.columns.len() {
-        return Err(general_err!(
-            "Column count mismatch. Schema has {} columns while Row Group has 
{}",
-            schema_descr.num_columns(),
-            row_group.columns.len()
-        ));
-    }
-
-    let num_rows = row_group.num_rows;
-    let sorting_columns = row_group.sorting_columns;
-    let total_byte_size = row_group.total_byte_size;
-    let file_offset = row_group.file_offset;
-    let ordinal = row_group.ordinal;
-
-    let columns = convert_columns(row_group.columns, schema_descr.clone())?;
-
-    Ok(RowGroupMetaData {
-        columns,
-        num_rows,
-        sorting_columns,
-        total_byte_size,
-        schema_descr,
-        file_offset,
-        ordinal,
-    })
-}
-
-fn convert_columns(
-    mut columns: Vec<ColumnChunk>,
-    schema_descr: Arc<SchemaDescriptor>,
-) -> Result<Vec<ColumnChunkMetaData>> {
-    let mut res: Vec<ColumnChunkMetaData> = Vec::with_capacity(columns.len());
-    for (c, d) in columns.drain(0..).zip(schema_descr.columns()) {
-        res.push(convert_column(c, d.clone())?);
-    }
-
-    Ok(res)
-}
-
-fn convert_column(
-    column: ColumnChunk,
-    column_descr: Arc<ColumnDescriptor>,
-) -> Result<ColumnChunkMetaData> {
-    if column.meta_data.is_none() {
-        return Err(general_err!("Expected to have column metadata"));
-    }
-    let col_metadata = column.meta_data.unwrap();
-    let column_type = col_metadata.r#type;
-    let encodings = col_metadata.encodings;
-    let compression = col_metadata.codec;
-    let file_path = column.file_path.map(|v| v.to_owned());
-    let file_offset = column.file_offset;
-    let num_values = col_metadata.num_values;
-    let total_compressed_size = col_metadata.total_compressed_size;
-    let total_uncompressed_size = col_metadata.total_uncompressed_size;
-    let data_page_offset = col_metadata.data_page_offset;
-    let index_page_offset = col_metadata.index_page_offset;
-    let dictionary_page_offset = col_metadata.dictionary_page_offset;
-    let statistics = convert_stats(column_type, col_metadata.statistics)?;
-    let encoding_stats = col_metadata.encoding_stats;
-    let bloom_filter_offset = col_metadata.bloom_filter_offset;
-    let bloom_filter_length = col_metadata.bloom_filter_length;
-    let offset_index_offset = column.offset_index_offset;
-    let offset_index_length = column.offset_index_length;
-    let column_index_offset = column.column_index_offset;
-    let column_index_length = column.column_index_length;
-    let (unencoded_byte_array_data_bytes, repetition_level_histogram, 
definition_level_histogram) =
-        if let Some(size_stats) = col_metadata.size_statistics {
-            (
-                size_stats.unencoded_byte_array_data_bytes,
-                size_stats.repetition_level_histogram,
-                size_stats.definition_level_histogram,
-            )
-        } else {
-            (None, None, None)
-        };
-
-    let geo_statistics = convert_geo_stats(col_metadata.geospatial_statistics);
-
-    let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
-    let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
-
-    let result = ColumnChunkMetaData {
-        column_descr,
-        encodings,
-        file_path,
-        file_offset,
-        num_values,
-        compression,
-        total_compressed_size,
-        total_uncompressed_size,
-        data_page_offset,
-        index_page_offset,
-        dictionary_page_offset,
-        statistics,
-        geo_statistics,
-        encoding_stats,
-        bloom_filter_offset,
-        bloom_filter_length,
-        offset_index_offset,
-        offset_index_length,
-        column_index_offset,
-        column_index_length,
-        unencoded_byte_array_data_bytes,
-        repetition_level_histogram,
-        definition_level_histogram,
-        #[cfg(feature = "encryption")]
-        column_crypto_metadata: column.crypto_metadata,
-        #[cfg(feature = "encryption")]
-        encrypted_column_metadata: None,
-    };
-    Ok(result)
-}
-
 fn convert_geo_stats(
     stats: Option<GeospatialStatistics>,
 ) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
@@ -435,6 +284,7 @@ fn convert_bounding_box(
     })
 }
 
+/// Create a [`crate::file::statistics::Statistics`] from a thrift 
[`Statistics`] object.
 pub(crate) fn convert_stats(
     physical_type: Type,
     thrift_stats: Option<Statistics>,
@@ -586,10 +436,11 @@ pub(crate) fn convert_stats(
 
 #[cfg(feature = "encryption")]
 fn row_group_from_encrypted_thrift(
-    mut rg: RowGroup,
-    schema_descr: SchemaDescPtr,
+    mut rg: RowGroupMetaData,
     decryptor: Option<&FileDecryptor>,
 ) -> Result<RowGroupMetaData> {
+    let schema_descr = rg.schema_descr;
+
     if schema_descr.num_columns() != rg.columns.len() {
         return Err(general_err!(
             "Column count mismatch. Schema has {} columns while Row Group has 
{}",
@@ -609,7 +460,7 @@ fn row_group_from_encrypted_thrift(
     {
         // Read encrypted metadata if it's present and we have a decryptor.
         if let (true, Some(decryptor)) = 
(c.encrypted_column_metadata.is_some(), decryptor) {
-            let column_decryptor = match c.crypto_metadata.as_ref() {
+            let column_decryptor = match c.crypto_metadata() {
                 None => {
                     return Err(general_err!(
                         "No crypto_metadata is set for column '{}', which has 
encrypted metadata",
@@ -636,23 +487,59 @@ fn row_group_from_encrypted_thrift(
                 None,
             )?;
 
-            let buf = c.encrypted_column_metadata.unwrap();
-            let decrypted_cc_buf =
-                column_decryptor
-                    .decrypt(buf, column_aad.as_ref())
-                    .map_err(|_| {
-                        general_err!(
-                            "Unable to decrypt column '{}', perhaps the column 
key is wrong?",
-                            d.path().string()
-                        )
-                    })?;
+            // Take the encrypted column metadata as it is no longer needed.
+            let encrypted_column_metadata = c.encrypted_column_metadata.take();
+            let buf = encrypted_column_metadata.unwrap();
+            let decrypted_cc_buf = column_decryptor
+                .decrypt(&buf, column_aad.as_ref())
+                .map_err(|_| {
+                    general_err!(
+                        "Unable to decrypt column '{}', perhaps the column key 
is wrong?",
+                        d.path().string()
+                    )
+                })?;
 
+            // parse decrypted buffer and then replace fields in 'c'
             let mut prot = 
ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice());
             let col_meta = ColumnMetaData::read_thrift(&mut prot)?;
-            c.meta_data = Some(col_meta);
-            columns.push(convert_column(c, d.clone())?);
+
+            let (
+                unencoded_byte_array_data_bytes,
+                repetition_level_histogram,
+                definition_level_histogram,
+            ) = if let Some(size_stats) = col_meta.size_statistics {
+                (
+                    size_stats.unencoded_byte_array_data_bytes,
+                    size_stats.repetition_level_histogram,
+                    size_stats.definition_level_histogram,
+                )
+            } else {
+                (None, None, None)
+            };
+
+            let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
+            let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
+
+            c.encodings = col_meta.encodings;
+            c.compression = col_meta.codec;
+            c.num_values = col_meta.num_values;
+            c.total_uncompressed_size = col_meta.total_uncompressed_size;
+            c.total_compressed_size = col_meta.total_compressed_size;
+            c.data_page_offset = col_meta.data_page_offset;
+            c.index_page_offset = col_meta.index_page_offset;
+            c.dictionary_page_offset = col_meta.dictionary_page_offset;
+            c.statistics = convert_stats(col_meta.r#type, 
col_meta.statistics)?;
+            c.encoding_stats = col_meta.encoding_stats;
+            c.bloom_filter_offset = col_meta.bloom_filter_offset;
+            c.bloom_filter_length = col_meta.bloom_filter_length;
+            c.unencoded_byte_array_data_bytes = 
unencoded_byte_array_data_bytes;
+            c.repetition_level_histogram = repetition_level_histogram;
+            c.definition_level_histogram = definition_level_histogram;
+            c.geo_statistics = 
convert_geo_stats(col_meta.geospatial_statistics);
+
+            columns.push(c);
         } else {
-            columns.push(convert_column(c, d.clone())?);
+            columns.push(c);
         }
     }
 
@@ -686,11 +573,14 @@ pub(crate) fn parquet_metadata_with_encryption(
     encrypted_footer: bool,
     buf: &[u8],
 ) -> Result<ParquetMetaData> {
-    let mut prot = ThriftSliceInputProtocol::new(buf);
+    use crate::file::metadata::ParquetMetaDataBuilder;
+
+    let mut buf = buf;
     let mut file_decryptor = None;
     let decrypted_fmd_buf;
 
     if encrypted_footer {
+        let mut prot = ThriftSliceInputProtocol::new(buf);
         if let Some(file_decryption_properties) = file_decryption_properties {
             let t_file_crypto_metadata: FileCryptoMetaData =
                 FileCryptoMetaData::read_thrift(&mut prot)
@@ -721,8 +611,8 @@ pub(crate) fn parquet_metadata_with_encryption(
                         "Provided footer key and AAD were unable to decrypt 
parquet footer"
                     )
                 })?;
-            prot = ThriftSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
 
+            buf = &decrypted_fmd_buf;
             file_decryptor = Some(decryptor);
         } else {
             return Err(general_err!(
@@ -731,24 +621,29 @@ pub(crate) fn parquet_metadata_with_encryption(
         }
     }
 
-    let file_meta = FileMetaData::read_thrift(&mut prot)
+    let parquet_meta = parquet_metadata_from_bytes(buf)
         .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
 
-    let version = file_meta.version;
-    let num_rows = file_meta.num_rows;
-    let created_by = file_meta.created_by.map(|c| c.to_owned());
-    let key_value_metadata = file_meta.key_value_metadata;
+    let ParquetMetaData {
+        mut file_metadata,
+        row_groups,
+        column_index: _,
+        offset_index: _,
+        file_decryptor: _,
+    } = parquet_meta;
+
+    // Take the encryption algorithm and footer signing key metadata as they 
are no longer
+    // needed after this.
+    if let (Some(algo), Some(file_decryption_properties)) = (
+        file_metadata.encryption_algorithm.take(),
+        file_decryption_properties,
+    ) {
+        let footer_signing_key_metadata = 
file_metadata.footer_signing_key_metadata.take();
 
-    let val = parquet_schema_from_array(file_meta.schema)?;
-    let schema_descr = Arc::new(SchemaDescriptor::new(val));
-
-    if let (Some(algo), Some(file_decryption_properties)) =
-        (file_meta.encryption_algorithm, file_decryption_properties)
-    {
         // File has a plaintext footer but encryption algorithm is set
         let file_decryptor_value = get_file_decryptor(
-            algo,
-            file_meta.footer_signing_key_metadata,
+            *algo,
+            footer_signing_key_metadata.as_deref(),
             file_decryption_properties,
         )?;
         if file_decryption_properties.check_plaintext_footer_integrity() && 
!encrypted_footer {
@@ -758,50 +653,15 @@ pub(crate) fn parquet_metadata_with_encryption(
     }
 
     // decrypt column chunk info
-    let mut row_groups = Vec::with_capacity(file_meta.row_groups.len());
-    for rg in file_meta.row_groups {
-        let r = row_group_from_encrypted_thrift(rg, schema_descr.clone(), 
file_decryptor.as_ref())?;
-        row_groups.push(r);
-    }
+    let row_groups = row_groups
+        .into_iter()
+        .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref()))
+        .collect::<Result<Vec<_>>>()?;
 
-    // need to map read column orders to actual values based on the schema
-    if file_meta
-        .column_orders
-        .as_ref()
-        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
-    {
-        return Err(general_err!("Column order length mismatch"));
-    }
-
-    let column_orders = file_meta.column_orders.map(|cos| {
-        let mut res = Vec::with_capacity(cos.len());
-        for (i, column) in schema_descr.columns().iter().enumerate() {
-            match cos[i] {
-                ColumnOrder::TYPE_DEFINED_ORDER(_) => {
-                    let sort_order = ColumnOrder::get_sort_order(
-                        column.logical_type(),
-                        column.converted_type(),
-                        column.physical_type(),
-                    );
-                    res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
-                }
-                _ => res.push(cos[i]),
-            }
-        }
-        res
-    });
-
-    let fmd = crate::file::metadata::FileMetaData::new(
-        version,
-        num_rows,
-        created_by,
-        key_value_metadata,
-        schema_descr,
-        column_orders,
-    );
-    let mut metadata = ParquetMetaData::new(fmd, row_groups);
-
-    metadata.with_file_decryptor(file_decryptor);
+    let metadata = ParquetMetaDataBuilder::new(file_metadata)
+        .set_row_groups(row_groups)
+        .set_file_decryptor(file_decryptor)
+        .build();
 
     Ok(metadata)
 }
@@ -837,62 +697,575 @@ fn get_file_decryptor(
     }
 }
 
-/// Create ParquetMetaData from thrift input. Note that this only decodes the 
file metadata in
-/// the Parquet footer. Page indexes will need to be added later.
-impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for 
ParquetMetaData {
-    fn read_thrift(prot: &mut R) -> Result<Self> {
-        let file_meta = FileMetaData::read_thrift(prot)?;
-
-        let version = file_meta.version;
-        let num_rows = file_meta.num_rows;
-        let row_groups = file_meta.row_groups;
-        let created_by = file_meta.created_by.map(|c| c.to_owned());
-        let key_value_metadata = file_meta.key_value_metadata;
-
-        let val = parquet_schema_from_array(file_meta.schema)?;
-        let schema_descr = Arc::new(SchemaDescriptor::new(val));
-
-        // need schema_descr to get final RowGroupMetaData
-        let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;
-
-        // need to map read column orders to actual values based on the schema
-        if file_meta
-            .column_orders
-            .as_ref()
-            .is_some_and(|cos| cos.len() != schema_descr.num_columns())
-        {
-            return Err(general_err!("Column order length mismatch"));
+// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait 
because
+// these are all internal and operate on slices.
+fn read_column_chunk<'a>(
+    prot: &mut ThriftSliceInputProtocol<'a>,
+    column_descr: &Arc<ColumnDescriptor>,
+) -> Result<ColumnChunkMetaData> {
+    // ColumnChunk fields
+    let mut file_path: Option<&str> = None;
+    let mut file_offset: Option<i64> = None;
+    let mut offset_index_offset: Option<i64> = None;
+    let mut offset_index_length: Option<i32> = None;
+    let mut column_index_offset: Option<i64> = None;
+    let mut column_index_length: Option<i32> = None;
+    #[cfg(feature = "encryption")]
+    let mut column_crypto_metadata: Option<Box<ColumnCryptoMetaData>> = None;
+    #[cfg(feature = "encryption")]
+    let mut encrypted_column_metadata: Option<&[u8]> = None;
+
+    // ColumnMetaData
+    let mut encodings: Option<Vec<Encoding>> = None;
+    let mut codec: Option<Compression> = None;
+    let mut num_values: Option<i64> = None;
+    let mut total_uncompressed_size: Option<i64> = None;
+    let mut total_compressed_size: Option<i64> = None;
+    let mut data_page_offset: Option<i64> = None;
+    let mut index_page_offset: Option<i64> = None;
+    let mut dictionary_page_offset: Option<i64> = None;
+    let mut statistics: Option<Statistics> = None;
+    let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
+    let mut bloom_filter_offset: Option<i64> = None;
+    let mut bloom_filter_length: Option<i32> = None;
+    let mut size_statistics: Option<SizeStatistics> = None;
+    let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+
+    // struct ColumnChunk {
+    //   1: optional string file_path
+    //   2: required i64 file_offset = 0
+    //   3: optional ColumnMetaData meta_data
+    //   4: optional i64 offset_index_offset
+    //   5: optional i32 offset_index_length
+    //   6: optional i64 column_index_offset
+    //   7: optional i32 column_index_length
+    //   8: optional ColumnCryptoMetaData crypto_metadata
+    //   9: optional binary encrypted_column_metadata
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
         }
-
-        let column_orders = file_meta.column_orders.map(|cos| {
-            let mut res = Vec::with_capacity(cos.len());
-            for (i, column) in schema_descr.columns().iter().enumerate() {
-                match cos[i] {
-                    ColumnOrder::TYPE_DEFINED_ORDER(_) => {
-                        let sort_order = ColumnOrder::get_sort_order(
-                            column.logical_type(),
-                            column.converted_type(),
-                            column.physical_type(),
-                        );
-                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
+        match field_ident.id {
+            1 => {
+                file_path = Some(<&str>::read_thrift(&mut *prot)?);
+            }
+            2 => {
+                file_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            3 => {
+                // `ColumnMetaData`. Read inline for performance sake.
+                // struct ColumnMetaData {
+                //   1: required Type type
+                //   2: required list<Encoding> encodings
+                //   3: required list<string> path_in_schema
+                //   4: required CompressionCodec codec
+                //   5: required i64 num_values
+                //   6: required i64 total_uncompressed_size
+                //   7: required i64 total_compressed_size
+                //   8: optional list<KeyValue> key_value_metadata
+                //   9: required i64 data_page_offset
+                //   10: optional i64 index_page_offset
+                //   11: optional i64 dictionary_page_offset
+                //   12: optional Statistics statistics;
+                //   13: optional list<PageEncodingStats> encoding_stats;
+                //   14: optional i64 bloom_filter_offset;
+                //   15: optional i32 bloom_filter_length;
+                //   16: optional SizeStatistics size_statistics;
+                //   17: optional GeospatialStatistics geospatial_statistics;
+                // }
+                let mut last_field_id = 0i16;
+                loop {
+                    let field_ident = prot.read_field_begin(last_field_id)?;
+                    if field_ident.field_type == FieldType::Stop {
+                        break;
                     }
-                    _ => res.push(cos[i]),
+                    match field_ident.id {
+                        // 1: type is never used, we can use the column 
descriptor
+                        2 => {
+                            let val =
+                                read_thrift_vec::<Encoding, 
ThriftSliceInputProtocol>(&mut *prot)?;
+                            encodings = Some(val);
+                        }
+                        // 3: path_in_schema is redundant
+                        4 => {
+                            codec = Some(Compression::read_thrift(&mut 
*prot)?);
+                        }
+                        5 => {
+                            num_values = Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        6 => {
+                            total_uncompressed_size = 
Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        7 => {
+                            total_compressed_size = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        // 8: we don't expose this key value
+                        9 => {
+                            data_page_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        10 => {
+                            index_page_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        11 => {
+                            dictionary_page_offset = 
Some(i64::read_thrift(&mut *prot)?);
+                        }
+                        12 => {
+                            statistics = Some(Statistics::read_thrift(&mut 
*prot)?);
+                        }
+                        13 => {
+                            let val = read_thrift_vec::<PageEncodingStats, 
ThriftSliceInputProtocol>(
+                                &mut *prot,
+                            )?;
+                            encoding_stats = Some(val);
+                        }
+                        14 => {
+                            bloom_filter_offset = Some(i64::read_thrift(&mut 
*prot)?);
+                        }
+                        15 => {
+                            bloom_filter_length = Some(i32::read_thrift(&mut 
*prot)?);
+                        }
+                        16 => {
+                            let val = SizeStatistics::read_thrift(&mut *prot)?;
+                            size_statistics = Some(val);
+                        }
+                        17 => {
+                            let val = GeospatialStatistics::read_thrift(&mut 
*prot)?;
+                            geospatial_statistics = Some(val);
+                        }
+                        _ => {
+                            prot.skip(field_ident.field_type)?;
+                        }
+                    };
+                    last_field_id = field_ident.id;
                 }
             }
-            res
-        });
+            4 => {
+                offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            5 => {
+                offset_index_length = Some(i32::read_thrift(&mut *prot)?);
+            }
+            6 => {
+                column_index_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            7 => {
+                column_index_length = Some(i32::read_thrift(&mut *prot)?);
+            }
+            #[cfg(feature = "encryption")]
+            8 => {
+                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
+                column_crypto_metadata = Some(Box::new(val));
+            }
+            #[cfg(feature = "encryption")]
+            9 => {
+                encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut 
*prot)?);
+            }
+            _ => {
+                prot.skip(field_ident.field_type)?;
+            }
+        };
+        last_field_id = field_ident.id;
+    }
 
-        let fmd = crate::file::metadata::FileMetaData::new(
-            version,
-            num_rows,
-            created_by,
-            key_value_metadata,
-            schema_descr,
-            column_orders,
-        );
+    // the only required field from ColumnChunk
+    let Some(file_offset) = file_offset else {
+        return Err(general_err!("Required field file_offset is missing"));
+    };
+
+    // transform optional fields
+    let file_path = file_path.map(|f| f.to_owned());
+    let (unencoded_byte_array_data_bytes, repetition_level_histogram, 
definition_level_histogram) =
+        if let Some(size_stats) = size_statistics {
+            (
+                size_stats.unencoded_byte_array_data_bytes,
+                size_stats.repetition_level_histogram,
+                size_stats.definition_level_histogram,
+            )
+        } else {
+            (None, None, None)
+        };
+
+    let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
+    let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
+
+    let statistics = convert_stats(column_descr.physical_type(), statistics)?;
+    let geo_statistics = convert_geo_stats(geospatial_statistics);
+    let column_descr = column_descr.clone();
+
+    // if encrypted, set the encrypted column metadata and return. we'll 
decrypt after finishing
+    // the footer and populate the rest.
+    #[cfg(feature = "encryption")]
+    if encrypted_column_metadata.is_some() {
+        use crate::file::metadata::ColumnChunkMetaDataBuilder;
+
+        let encrypted_column_metadata = encrypted_column_metadata.map(|s| 
s.to_vec());
+
+        // use builder to get uninitialized ColumnChunkMetaData
+        let mut col = ColumnChunkMetaDataBuilder::new(column_descr).build()?;
+
+        // set ColumnChunk fields
+        col.file_path = file_path;
+        col.file_offset = file_offset;
+        col.offset_index_offset = offset_index_offset;
+        col.offset_index_length = offset_index_length;
+        col.column_index_offset = column_index_offset;
+        col.column_index_length = column_index_length;
+        col.column_crypto_metadata = column_crypto_metadata;
+        col.encrypted_column_metadata = encrypted_column_metadata;
+
+        // check for ColumnMetaData fields that might be present
+        // first required fields
+        if let Some(encodings) = encodings {
+            col.encodings = encodings;
+        }
+        if let Some(codec) = codec {
+            col.compression = codec;
+        }
+        if let Some(num_values) = num_values {
+            col.num_values = num_values;
+        }
+        if let Some(total_uncompressed_size) = total_uncompressed_size {
+            col.total_uncompressed_size = total_uncompressed_size;
+        }
+        if let Some(total_compressed_size) = total_compressed_size {
+            col.total_compressed_size = total_compressed_size;
+        }
+        if let Some(data_page_offset) = data_page_offset {
+            col.data_page_offset = data_page_offset;
+        }
+
+        // then optional
+        col.index_page_offset = index_page_offset;
+        col.dictionary_page_offset = dictionary_page_offset;
+        col.bloom_filter_offset = bloom_filter_offset;
+        col.bloom_filter_length = bloom_filter_length;
+        col.unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes;
+        col.repetition_level_histogram = repetition_level_histogram;
+        col.definition_level_histogram = definition_level_histogram;
+        col.encoding_stats = encoding_stats;
+        col.statistics = statistics;
+        col.geo_statistics = geo_statistics;
+
+        return Ok(col);
+    }
+
+    // not encrypted, so meta_data better exist
+    let Some(encodings) = encodings else {
+        return Err(ParquetError::General(
+            "Required field encodings is missing".to_owned(),
+        ));
+    };
+    let Some(codec) = codec else {
+        return Err(ParquetError::General(
+            "Required field codec is missing".to_owned(),
+        ));
+    };
+    let Some(num_values) = num_values else {
+        return Err(ParquetError::General(
+            "Required field num_values is missing".to_owned(),
+        ));
+    };
+    let Some(total_uncompressed_size) = total_uncompressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_uncompressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(total_compressed_size) = total_compressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_compressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(data_page_offset) = data_page_offset else {
+        return Err(ParquetError::General(
+            "Required field data_page_offset is missing".to_owned(),
+        ));
+    };
+
+    let compression = codec;
+
+    // NOTE: I tried using the builder for this, but it added 20% to the 
execution time
+    let result = ColumnChunkMetaData {
+        column_descr,
+        encodings,
+        file_path,
+        file_offset,
+        num_values,
+        compression,
+        total_compressed_size,
+        total_uncompressed_size,
+        data_page_offset,
+        index_page_offset,
+        dictionary_page_offset,
+        statistics,
+        geo_statistics,
+        encoding_stats,
+        bloom_filter_offset,
+        bloom_filter_length,
+        offset_index_offset,
+        offset_index_length,
+        column_index_offset,
+        column_index_length,
+        unencoded_byte_array_data_bytes,
+        repetition_level_histogram,
+        definition_level_histogram,
+        #[cfg(feature = "encryption")]
+        column_crypto_metadata,
+        #[cfg(feature = "encryption")]
+        encrypted_column_metadata: None, // tested is_some above
+    };
+    Ok(result)
+}
+
+fn read_row_group(
+    prot: &mut ThriftSliceInputProtocol,
+    schema_descr: &Arc<SchemaDescriptor>,
+) -> Result<RowGroupMetaData> {
+    let mut columns: Option<Vec<ColumnChunkMetaData>> = None;
+    let mut total_byte_size: Option<i64> = None;
+    let mut num_rows: Option<i64> = None;
+    let mut sorting_columns: Option<Vec<SortingColumn>> = None;
+    let mut file_offset: Option<i64> = None;
+    let mut ordinal: Option<i16> = None;
+
+    // struct RowGroup {
+    //   1: required list<ColumnChunk> columns
+    //   2: required i64 total_byte_size
+    //   3: required i64 num_rows
+    //   4: optional list<SortingColumn> sorting_columns
+    //   5: optional i64 file_offset
+    //   6: optional i64 total_compressed_size
+    //   7: optional i16 ordinal
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
+        }
+        match field_ident.id {
+            1 => {
+                let list_ident = prot.read_list_begin()?;
+                if schema_descr.num_columns() != list_ident.size as usize {
+                    return Err(general_err!(
+                        "Column count mismatch. Schema has {} columns while 
Row Group has {}",
+                        schema_descr.num_columns(),
+                        list_ident.size
+                    ));
+                }
+                let mut cols = Vec::with_capacity(list_ident.size as usize);
+                for i in 0..list_ident.size as usize {
+                    let col = read_column_chunk(prot, 
&schema_descr.columns()[i])?;
+                    cols.push(col);
+                }
+                columns = Some(cols);
+            }
+            2 => {
+                total_byte_size = Some(i64::read_thrift(&mut *prot)?);
+            }
+            3 => {
+                num_rows = Some(i64::read_thrift(&mut *prot)?);
+            }
+            4 => {
+                let val = read_thrift_vec::<SortingColumn, 
ThriftSliceInputProtocol>(&mut *prot)?;
+                sorting_columns = Some(val);
+            }
+            5 => {
+                file_offset = Some(i64::read_thrift(&mut *prot)?);
+            }
+            // 6: we don't expose total_compressed_size
+            7 => {
+                ordinal = Some(i16::read_thrift(&mut *prot)?);
+            }
+            _ => {
+                prot.skip(field_ident.field_type)?;
+            }
+        };
+        last_field_id = field_ident.id;
+    }
+    let Some(columns) = columns else {
+        return Err(ParquetError::General(
+            "Required field columns is missing".to_owned(),
+        ));
+    };
+    let Some(total_byte_size) = total_byte_size else {
+        return Err(ParquetError::General(
+            "Required field total_byte_size is missing".to_owned(),
+        ));
+    };
+    let Some(num_rows) = num_rows else {
+        return Err(ParquetError::General(
+            "Required field num_rows is missing".to_owned(),
+        ));
+    };
+
+    Ok(RowGroupMetaData {
+        columns,
+        num_rows,
+        sorting_columns,
+        total_byte_size,
+        schema_descr: schema_descr.clone(),
+        file_offset,
+        ordinal,
+    })
+}
 
-        Ok(ParquetMetaData::new(fmd, row_groups))
+/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes 
the file metadata in
+/// the Parquet footer. Page indexes will need to be added later.
+pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> 
Result<ParquetMetaData> {
+    let mut prot = ThriftSliceInputProtocol::new(buf);
+
+    // begin reading the file metadata
+    let mut version: Option<i32> = None;
+    let mut num_rows: Option<i64> = None;
+    let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
+    let mut key_value_metadata: Option<Vec<KeyValue>> = None;
+    let mut created_by: Option<&str> = None;
+    let mut column_orders: Option<Vec<ColumnOrder>> = None;
+    #[cfg(feature = "encryption")]
+    let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
+    #[cfg(feature = "encryption")]
+    let mut footer_signing_key_metadata: Option<&[u8]> = None;
+
+    // this will need to be set before parsing row groups
+    let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
+
+    // struct FileMetaData {
+    //   1: required i32 version
+    //   2: required list<SchemaElement> schema;
+    //   3: required i64 num_rows
+    //   4: required list<RowGroup> row_groups
+    //   5: optional list<KeyValue> key_value_metadata
+    //   6: optional string created_by
+    //   7: optional list<ColumnOrder> column_orders;
+    //   8: optional EncryptionAlgorithm encryption_algorithm
+    //   9: optional binary footer_signing_key_metadata
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
+        }
+        match field_ident.id {
+            1 => {
+                version = Some(i32::read_thrift(&mut prot)?);
+            }
+            2 => {
+                // read schema and convert to SchemaDescriptor for use when 
reading row groups
+                let val = read_thrift_vec::<SchemaElement, 
ThriftSliceInputProtocol>(&mut prot)?;
+                let val = parquet_schema_from_array(val)?;
+                schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
+            }
+            3 => {
+                num_rows = Some(i64::read_thrift(&mut prot)?);
+            }
+            4 => {
+                if schema_descr.is_none() {
+                    return Err(general_err!("Required field schema is 
missing"));
+                }
+                let schema_descr = schema_descr.as_ref().unwrap();
+                let list_ident = prot.read_list_begin()?;
+                let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
+                for _ in 0..list_ident.size {
+                    rg_vec.push(read_row_group(&mut prot, schema_descr)?);
+                }
+                row_groups = Some(rg_vec);
+            }
+            5 => {
+                let val = read_thrift_vec::<KeyValue, 
ThriftSliceInputProtocol>(&mut prot)?;
+                key_value_metadata = Some(val);
+            }
+            6 => {
+                created_by = Some(<&str>::read_thrift(&mut prot)?);
+            }
+            7 => {
+                let val = read_thrift_vec::<ColumnOrder, 
ThriftSliceInputProtocol>(&mut prot)?;
+                column_orders = Some(val);
+            }
+            #[cfg(feature = "encryption")]
+            8 => {
+                let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
+                encryption_algorithm = Some(val);
+            }
+            #[cfg(feature = "encryption")]
+            9 => {
+                footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut 
prot)?);
+            }
+            _ => {
+                prot.skip(field_ident.field_type)?;
+            }
+        };
+        last_field_id = field_ident.id;
     }
+    let Some(version) = version else {
+        return Err(ParquetError::General(
+            "Required field version is missing".to_owned(),
+        ));
+    };
+    let Some(num_rows) = num_rows else {
+        return Err(ParquetError::General(
+            "Required field num_rows is missing".to_owned(),
+        ));
+    };
+    let Some(row_groups) = row_groups else {
+        return Err(ParquetError::General(
+            "Required field row_groups is missing".to_owned(),
+        ));
+    };
+
+    let created_by = created_by.map(|c| c.to_owned());
+
+    // we've tested for `None` by now so this is safe
+    let schema_descr = schema_descr.unwrap();
+
+    // need to map read column orders to actual values based on the schema
+    if column_orders
+        .as_ref()
+        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
+    {
+        return Err(general_err!("Column order length mismatch"));
+    }
+    // replace default type defined column orders with ones having the correct 
sort order
+    // TODO(ets): this could instead be done above when decoding
+    let column_orders = column_orders.map(|mut cos| {
+        for (i, column) in schema_descr.columns().iter().enumerate() {
+            if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
+                let sort_order = ColumnOrder::get_sort_order(
+                    column.logical_type(),
+                    column.converted_type(),
+                    column.physical_type(),
+                );
+                cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
+            }
+        }
+        cos
+    });
+
+    #[cfg(not(feature = "encryption"))]
+    let fmd = crate::file::metadata::FileMetaData::new(
+        version,
+        num_rows,
+        created_by,
+        key_value_metadata,
+        schema_descr,
+        column_orders,
+    );
+    #[cfg(feature = "encryption")]
+    let fmd = crate::file::metadata::FileMetaData::new(
+        version,
+        num_rows,
+        created_by,
+        key_value_metadata,
+        schema_descr,
+        column_orders,
+    )
+    .with_encryption_algorithm(encryption_algorithm)
+    .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| 
v.to_vec()));
+
+    Ok(ParquetMetaData::new(fmd, row_groups))
 }
 
 thrift_struct!(
@@ -1335,13 +1708,24 @@ pub(crate) fn serialize_column_meta_data<W: Write>(
 pub(crate) struct FileMeta<'a> {
     pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
     pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
-    pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
-    pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
 }
 
+// struct FileMetaData {
+//   1: required i32 version
+//   2: required list<SchemaElement> schema;
+//   3: required i64 num_rows
+//   4: required list<RowGroup> row_groups
+//   5: optional list<KeyValue> key_value_metadata
+//   6: optional string created_by
+//   7: optional list<ColumnOrder> column_orders;
+//   8: optional EncryptionAlgorithm encryption_algorithm
+//   9: optional binary footer_signing_key_metadata
+// }
 impl<'a> WriteThrift for FileMeta<'a> {
     const ELEMENT_TYPE: ElementType = ElementType::Struct;
 
+    // needed for last_field_id w/o encryption
+    #[allow(unused_assignments)]
     fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
         self.file_metadata
             .version
@@ -1372,10 +1756,12 @@ impl<'a> WriteThrift for FileMeta<'a> {
         if let Some(column_orders) = self.file_metadata.column_orders() {
             last_field_id = column_orders.write_thrift_field(writer, 7, 
last_field_id)?;
         }
-        if let Some(algo) = self.encryption_algorithm.as_ref() {
+        #[cfg(feature = "encryption")]
+        if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
             last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
         }
-        if let Some(key) = self.footer_signing_key_metadata.as_ref() {
+        #[cfg(feature = "encryption")]
+        if let Some(key) = 
self.file_metadata.footer_signing_key_metadata.as_ref() {
             key.as_slice()
                 .write_thrift_field(writer, 9, last_field_id)?;
         }
@@ -1661,15 +2047,11 @@ impl WriteThriftField for 
crate::geospatial::bounding_box::BoundingBox {
 #[cfg(test)]
 pub(crate) mod tests {
     use crate::errors::Result;
-    use crate::file::metadata::thrift_gen::{
-        BoundingBox, ColumnChunk, RowGroup, SchemaElement, convert_column, 
convert_row_group,
-        write_schema,
-    };
+    use crate::file::metadata::thrift_gen::{BoundingBox, SchemaElement, 
write_schema};
     use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
     use crate::parquet_thrift::tests::test_roundtrip;
     use crate::parquet_thrift::{
-        ElementType, ReadThrift, ThriftCompactOutputProtocol, 
ThriftSliceInputProtocol,
-        read_thrift_vec,
+        ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, 
read_thrift_vec,
     };
     use crate::schema::types::{
         ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, 
parquet_schema_from_array,
@@ -1682,8 +2064,7 @@ pub(crate) mod tests {
         schema_descr: Arc<SchemaDescriptor>,
     ) -> Result<RowGroupMetaData> {
         let mut reader = ThriftSliceInputProtocol::new(buf);
-        let rg = RowGroup::read_thrift(&mut reader)?;
-        convert_row_group(rg, schema_descr)
+        crate::file::metadata::thrift_gen::read_row_group(&mut reader, 
&schema_descr)
     }
 
     pub(crate) fn read_column_chunk(
@@ -1691,8 +2072,7 @@ pub(crate) mod tests {
         column_descr: Arc<ColumnDescriptor>,
     ) -> Result<ColumnChunkMetaData> {
         let mut reader = ThriftSliceInputProtocol::new(buf);
-        let cc = ColumnChunk::read_thrift(&mut reader)?;
-        convert_column(cc, column_descr)
+        crate::file::metadata::thrift_gen::read_column_chunk(&mut reader, 
&column_descr)
     }
 
     pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
diff --git a/parquet/src/file/metadata/writer.rs 
b/parquet/src/file/metadata/writer.rs
index 8bda32481b..35c69935a8 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::file::metadata::thrift_gen::{EncryptionAlgorithm, FileMeta};
+use crate::file::metadata::thrift_gen::FileMeta;
 use crate::file::metadata::{
     ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, 
RowGroupMetaData,
 };
@@ -31,7 +31,7 @@ use crate::{
         modules::{ModuleType, create_footer_aad, create_module_aad},
     },
     file::column_crypto_metadata::ColumnCryptoMetaData,
-    file::metadata::thrift_gen::{AesGcmV1, FileCryptoMetaData},
+    file::metadata::thrift_gen::{AesGcmV1, EncryptionAlgorithm, 
FileCryptoMetaData},
 };
 use crate::{errors::Result, 
file::page_index::column_index::ColumnIndexMetaData};
 
@@ -174,9 +174,22 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
             .object_writer
             .apply_row_group_encryption(self.row_groups)?;
 
+        #[cfg(feature = "encryption")]
         let (encryption_algorithm, footer_signing_key_metadata) =
             self.object_writer.get_plaintext_footer_crypto_metadata();
+        #[cfg(feature = "encryption")]
+        let file_metadata = FileMetaData::new(
+            self.writer_version,
+            num_rows,
+            self.created_by,
+            self.key_value_metadata,
+            self.schema_descr.clone(),
+            column_orders,
+        )
+        .with_encryption_algorithm(encryption_algorithm)
+        .with_footer_signing_key_metadata(footer_signing_key_metadata);
 
+        #[cfg(not(feature = "encryption"))]
         let file_metadata = FileMetaData::new(
             self.writer_version,
             num_rows,
@@ -189,8 +202,6 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
         let file_meta = FileMeta {
             file_metadata: &file_metadata,
             row_groups: &row_groups,
-            encryption_algorithm,
-            footer_signing_key_metadata,
         };
 
         // Write file metadata
@@ -524,12 +535,6 @@ impl MetadataObjectWriter {
     pub fn get_file_magic(&self) -> &[u8; 4] {
         get_file_magic()
     }
-
-    fn get_plaintext_footer_crypto_metadata(
-        &self,
-    ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
-        (None, None)
-    }
 }
 
 /// Implementations of [`MetadataObjectWriter`] methods that rely on 
encryption being enabled
@@ -557,7 +562,7 @@ impl MetadataObjectWriter {
                 let mut encryptor = file_encryptor.get_footer_encryptor()?;
                 encrypt_thrift_object(file_metadata, &mut encryptor, &mut 
sink, &aad)
             }
-            Some(file_encryptor) if 
file_metadata.encryption_algorithm.is_some() => {
+            Some(file_encryptor) if 
file_metadata.file_metadata.encryption_algorithm.is_some() => {
                 let aad = create_footer_aad(file_encryptor.file_aad())?;
                 let mut encryptor = file_encryptor.get_footer_encryptor()?;
                 write_signed_plaintext_thrift_object(file_metadata, &mut 
encryptor, &mut sink, &aad)
@@ -750,7 +755,7 @@ impl MetadataObjectWriter {
     ) -> Result<ColumnChunkMetaData> {
         // Column crypto metadata should have already been set when the column 
was created.
         // Here we apply the encryption by encrypting the column metadata if 
required.
-        match column_chunk.column_crypto_metadata.as_ref() {
+        match column_chunk.column_crypto_metadata.as_deref() {
             None => {}
             Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
                 // When uniform encryption is used the footer is already 
encrypted,
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 4d947dc5a2..748a2df32f 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -1894,12 +1894,6 @@ mod tests {
             80, 65, 82, 49,
         ];
         let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
-        #[cfg(feature = "encryption")]
-        assert_eq!(
-            ret.err().unwrap().to_string(),
-            "Parquet error: Could not parse metadata: Parquet error: Received 
empty union from remote ColumnOrder"
-        );
-        #[cfg(not(feature = "encryption"))]
         assert_eq!(
             ret.err().unwrap().to_string(),
             "Parquet error: Received empty union from remote ColumnOrder"
diff --git a/parquet/src/geospatial/bounding_box.rs 
b/parquet/src/geospatial/bounding_box.rs
index ce23696afc..59a4dfb501 100644
--- a/parquet/src/geospatial/bounding_box.rs
+++ b/parquet/src/geospatial/bounding_box.rs
@@ -22,6 +22,8 @@
 //!
 //!
 
+use crate::file::metadata::HeapSize;
+
 /// A geospatial instance has at least two coordinate dimensions: X and Y for 
2D coordinates of each point.
 /// X represents longitude/easting and Y represents latitude/northing. A 
geospatial instance can optionally
 /// have Z and/or M values associated with each point.
@@ -170,6 +172,12 @@ impl BoundingBox {
     }
 }
 
+impl HeapSize for BoundingBox {
+    fn heap_size(&self) -> usize {
+        0 // no heap allocations
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/parquet/src/geospatial/statistics.rs 
b/parquet/src/geospatial/statistics.rs
index 245f012c50..edabfea52d 100644
--- a/parquet/src/geospatial/statistics.rs
+++ b/parquet/src/geospatial/statistics.rs
@@ -20,7 +20,7 @@
 //! This module provides functionality for working with geospatial statistics 
in Parquet files.
 //! It includes support for bounding boxes and geospatial statistics in column 
chunk metadata.
 
-use crate::geospatial::bounding_box::BoundingBox;
+use crate::{file::metadata::HeapSize, geospatial::bounding_box::BoundingBox};
 
 // ----------------------------------------------------------------------
 // Geospatial Statistics
@@ -66,3 +66,9 @@ impl GeospatialStatistics {
         self.bbox.as_ref()
     }
 }
+
+impl HeapSize for GeospatialStatistics {
+    fn heap_size(&self) -> usize {
+        self.bbox.heap_size() + self.geospatial_types.heap_size()
+    }
+}
diff --git a/parquet/tests/arrow_reader/bad_data.rs 
b/parquet/tests/arrow_reader/bad_data.rs
index be401030e7..235f818124 100644
--- a/parquet/tests/arrow_reader/bad_data.rs
+++ b/parquet/tests/arrow_reader/bad_data.rs
@@ -80,12 +80,6 @@ fn test_invalid_files() {
 #[test]
 fn test_parquet_1481() {
     let err = read_file("PARQUET-1481.parquet").unwrap_err();
-    #[cfg(feature = "encryption")]
-    assert_eq!(
-        err.to_string(),
-        "Parquet error: Could not parse metadata: Parquet error: Unexpected 
Type -7"
-    );
-    #[cfg(not(feature = "encryption"))]
     assert_eq!(err.to_string(), "Parquet error: Unexpected Type -7");
 }
 

Reply via email to