This is an automated email from the ASF dual-hosted git repository.

etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this 
push:
     new aa26c0cfb1 [thrift-remodel] Use new writer to write Parquet file 
metadata (#8445)
aa26c0cfb1 is described below

commit aa26c0cfb1493ce112cf65144783bfef42bf8c94
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Sep 26 09:23:33 2025 -0700

    [thrift-remodel] Use new writer to write Parquet file metadata (#8445)
    
    # Which issue does this PR close?
    
    **Note: this targets a feature branch, not main**
    
    - Part of #5854.
    
    # Rationale for this change
    
    This PR closes the loop and and now Parquet metadata is completely
    handled by the new code.
    
    # What changes are included in this PR?
    
    Changes the metadata builders to use the new structs rather than those
    from `format`. As a consequence, the `close` methods no longer return a
    `format::FileMetaData` but instead return a `ParquetMetaData`.
    
    # Are these changes tested?
    
    Covered by existing tests, but many tests were modified to deal with the
    switch to `ParquetMetaData` mentioned above.
    
    # Are there any user-facing changes?
    
    Yes
---
 parquet/benches/metadata.rs                  |  46 ++--
 parquet/src/arrow/arrow_reader/mod.rs        |   3 +-
 parquet/src/arrow/arrow_writer/mod.rs        |  87 ++++---
 parquet/src/arrow/async_writer/mod.rs        |   6 +-
 parquet/src/column/writer/mod.rs             |  13 +-
 parquet/src/encryption/encrypt.rs            |  35 +--
 parquet/src/file/column_crypto_metadata.rs   |  16 ++
 parquet/src/file/metadata/memory.rs          |   7 +
 parquet/src/file/metadata/mod.rs             |  10 +-
 parquet/src/file/metadata/thrift_gen.rs      | 373 +++++++++++++++++++++++++--
 parquet/src/file/metadata/writer.rs          | 328 +++++++++++------------
 parquet/src/file/serialized_reader.rs        |   4 +-
 parquet/src/file/writer.rs                   | 116 ++++-----
 parquet/src/schema/types.rs                  |  18 ++
 parquet/tests/encryption/encryption.rs       |  20 +-
 parquet/tests/encryption/encryption_async.rs |  10 +-
 16 files changed, 711 insertions(+), 381 deletions(-)

diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs
index ced0175da8..d05f1e09cb 100644
--- a/parquet/benches/metadata.rs
+++ b/parquet/benches/metadata.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#[cfg(feature = "arrow")]
+use parquet::file::metadata::ParquetMetaData;
 use parquet::file::metadata::ParquetMetaDataReader;
 use rand::Rng;
 use thrift::protocol::TCompactOutputProtocol;
@@ -164,7 +166,7 @@ fn get_footer_bytes(data: Bytes) -> Bytes {
 }
 
 #[cfg(feature = "arrow")]
-fn rewrite_file(bytes: Bytes) -> (Bytes, FileMetaData) {
+fn rewrite_file(bytes: Bytes) -> (Bytes, ParquetMetaData) {
     use arrow::array::RecordBatchReader;
     use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, 
ArrowWriter};
     use parquet::file::properties::{EnabledStatistics, WriterProperties};
@@ -217,6 +219,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         })
     });
 
+    // FIXME(ets): remove benches of private APIs
     c.bench_function("decode thrift file metadata", |b| {
         b.iter(|| {
             parquet::thrift::bench_file_metadata(&meta_data);
@@ -237,45 +240,42 @@ fn criterion_benchmark(c: &mut Criterion) {
     });
 
     // rewrite file with page statistics. then read page headers.
+    // FIXME(ets): remove the page header benches when remodel is complete
     #[cfg(feature = "arrow")]
     let (file_bytes, metadata) = rewrite_file(data.clone());
     #[cfg(feature = "arrow")]
     c.bench_function("page headers", |b| {
         b.iter(|| {
-            metadata.row_groups.iter().for_each(|rg| {
-                rg.columns.iter().for_each(|col| {
-                    if let Some(col_meta) = &col.meta_data {
-                        if let Some(dict_offset) = 
col_meta.dictionary_page_offset {
-                            parquet::thrift::bench_page_header(
-                                &file_bytes.slice(dict_offset as usize..),
-                            );
-                        }
+            for rg in metadata.row_groups() {
+                for col in rg.columns() {
+                    if let Some(dict_offset) = col.dictionary_page_offset() {
                         parquet::thrift::bench_page_header(
-                            &file_bytes.slice(col_meta.data_page_offset as 
usize..),
+                            &file_bytes.slice(dict_offset as usize..),
                         );
                     }
-                });
-            });
+                    parquet::thrift::bench_page_header(
+                        &file_bytes.slice(col.data_page_offset() as usize..),
+                    );
+                }
+            }
         })
     });
 
     #[cfg(feature = "arrow")]
     c.bench_function("page headers (no stats)", |b| {
         b.iter(|| {
-            metadata.row_groups.iter().for_each(|rg| {
-                rg.columns.iter().for_each(|col| {
-                    if let Some(col_meta) = &col.meta_data {
-                        if let Some(dict_offset) = 
col_meta.dictionary_page_offset {
-                            parquet::thrift::bench_page_header_no_stats(
-                                &file_bytes.slice(dict_offset as usize..),
-                            );
-                        }
+            for rg in metadata.row_groups() {
+                for col in rg.columns() {
+                    if let Some(dict_offset) = col.dictionary_page_offset() {
                         parquet::thrift::bench_page_header_no_stats(
-                            &file_bytes.slice(col_meta.data_page_offset as 
usize..),
+                            &file_bytes.slice(dict_offset as usize..),
                         );
                     }
-                });
-            });
+                    parquet::thrift::bench_page_header_no_stats(
+                        &file_bytes.slice(col.data_page_offset() as usize..),
+                    );
+                }
+            }
         })
     });
 }
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 00beaa364f..8d5b3b55c1 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -1185,6 +1185,7 @@ mod tests {
         FloatType, Int32Type, Int64Type, Int96, Int96Type,
     };
     use crate::errors::Result;
+    use crate::file::metadata::ParquetMetaData;
     use crate::file::properties::{EnabledStatistics, WriterProperties, 
WriterVersion};
     use crate::file::writer::SerializedFileWriter;
     use crate::schema::parser::parse_message_type;
@@ -2913,7 +2914,7 @@ mod tests {
         schema: TypePtr,
         field: Option<Field>,
         opts: &TestOptions,
-    ) -> Result<crate::format::FileMetaData> {
+    ) -> Result<ParquetMetaData> {
         let mut writer_props = opts.writer_props();
         if let Some(field) = field {
             let arrow_schema = Schema::new(vec![field]);
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 4fae03affa..6b4dc87abb 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -43,7 +43,7 @@ use crate::data_type::{ByteArray, FixedLenByteArray};
 #[cfg(feature = "encryption")]
 use crate::encryption::encrypt::FileEncryptor;
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{KeyValue, RowGroupMetaData};
+use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
 use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
 use crate::file::reader::{ChunkReader, Length};
 use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
@@ -397,13 +397,13 @@ impl<W: Write + Send> ArrowWriter<W> {
     /// Unlike [`Self::close`] this does not consume self
     ///
     /// Attempting to write after calling finish will result in an error
-    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
+    pub fn finish(&mut self) -> Result<ParquetMetaData> {
         self.flush()?;
         self.writer.finish()
     }
 
     /// Close and finalize the underlying Parquet writer
-    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
+    pub fn close(mut self) -> Result<ParquetMetaData> {
         self.finish()
     }
 
@@ -754,7 +754,7 @@ impl ArrowColumnChunk {
 /// row_group_writer.close().unwrap();
 ///
 /// let metadata = writer.close().unwrap();
-/// assert_eq!(metadata.num_rows, 3);
+/// assert_eq!(metadata.file_metadata().num_rows(), 3);
 /// ```
 pub struct ArrowColumnWriter {
     writer: ArrowColumnWriterImpl,
@@ -1510,7 +1510,6 @@ mod tests {
     use crate::arrow::ARROW_SCHEMA_META_KEY;
     use crate::column::page::{Page, PageReader};
     use crate::file::metadata::thrift_gen::PageHeader;
-    use crate::file::page_encoding_stats::PageEncodingStats;
     use crate::file::page_index::column_index::ColumnIndexMetaData;
     use crate::file::reader::SerializedPageReader;
     use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
@@ -2579,12 +2578,12 @@ mod tests {
             ArrowWriter::try_new(&mut out, batch.schema(), 
None).expect("Unable to write file");
         writer.write(&batch).unwrap();
         let file_meta_data = writer.close().unwrap();
-        for row_group in file_meta_data.row_groups {
-            for column in row_group.columns {
-                assert!(column.offset_index_offset.is_some());
-                assert!(column.offset_index_length.is_some());
-                assert!(column.column_index_offset.is_none());
-                assert!(column.column_index_length.is_none());
+        for row_group in file_meta_data.row_groups() {
+            for column in row_group.columns() {
+                assert!(column.offset_index_offset().is_some());
+                assert!(column.offset_index_length().is_some());
+                assert!(column.column_index_offset().is_none());
+                assert!(column.column_index_length().is_none());
             }
         }
     }
@@ -3033,14 +3032,18 @@ mod tests {
         writer.write(&batch).unwrap();
         let file_metadata = writer.close().unwrap();
 
+        let schema = file_metadata.file_metadata().schema();
         // Coerced name of "item" should be "element"
-        assert_eq!(file_metadata.schema[3].name, "element");
+        let list_field = &schema.get_fields()[0].get_fields()[0];
+        assert_eq!(list_field.get_fields()[0].name(), "element");
+
+        let map_field = &schema.get_fields()[1].get_fields()[0];
         // Coerced name of "entries" should be "key_value"
-        assert_eq!(file_metadata.schema[5].name, "key_value");
+        assert_eq!(map_field.name(), "key_value");
         // Coerced name of "keys" should be "key"
-        assert_eq!(file_metadata.schema[6].name, "key");
+        assert_eq!(map_field.get_fields()[0].name(), "key");
         // Coerced name of "values" should be "value"
-        assert_eq!(file_metadata.schema[7].name, "value");
+        assert_eq!(map_field.get_fields()[1].name(), "value");
 
         // Double check schema after reading from the file
         let reader = SerializedFileReader::new(file).unwrap();
@@ -3984,15 +3987,15 @@ mod tests {
         writer.write(&batch).unwrap();
 
         let metadata = writer.close().unwrap();
-        assert_eq!(metadata.row_groups.len(), 1);
-        let row_group = &metadata.row_groups[0];
-        assert_eq!(row_group.columns.len(), 2);
+        assert_eq!(metadata.num_row_groups(), 1);
+        let row_group = metadata.row_group(0);
+        assert_eq!(row_group.num_columns(), 2);
         // Column "a" has both offset and column index, as requested
-        assert!(row_group.columns[0].offset_index_offset.is_some());
-        assert!(row_group.columns[0].column_index_offset.is_some());
+        assert!(row_group.column(0).offset_index_offset().is_some());
+        assert!(row_group.column(0).column_index_offset().is_some());
         // Column "b" should only have offset index
-        assert!(row_group.columns[1].offset_index_offset.is_some());
-        assert!(row_group.columns[1].column_index_offset.is_none());
+        assert!(row_group.column(1).offset_index_offset().is_some());
+        assert!(row_group.column(1).column_index_offset().is_none());
 
         let options = ReadOptionsBuilder::new().with_page_index().build();
         let reader = SerializedFileReader::new_with_options(Bytes::from(buf), 
options).unwrap();
@@ -4059,15 +4062,15 @@ mod tests {
         writer.write(&batch).unwrap();
 
         let metadata = writer.close().unwrap();
-        assert_eq!(metadata.row_groups.len(), 1);
-        let row_group = &metadata.row_groups[0];
-        assert_eq!(row_group.columns.len(), 2);
+        assert_eq!(metadata.num_row_groups(), 1);
+        let row_group = metadata.row_group(0);
+        assert_eq!(row_group.num_columns(), 2);
         // Column "a" should only have offset index
-        assert!(row_group.columns[0].offset_index_offset.is_some());
-        assert!(row_group.columns[0].column_index_offset.is_none());
+        assert!(row_group.column(0).offset_index_offset().is_some());
+        assert!(row_group.column(0).column_index_offset().is_none());
         // Column "b" should only have offset index
-        assert!(row_group.columns[1].offset_index_offset.is_some());
-        assert!(row_group.columns[1].column_index_offset.is_none());
+        assert!(row_group.column(1).offset_index_offset().is_some());
+        assert!(row_group.column(1).column_index_offset().is_none());
 
         let options = ReadOptionsBuilder::new().with_page_index().build();
         let reader = SerializedFileReader::new_with_options(Bytes::from(buf), 
options).unwrap();
@@ -4331,14 +4334,18 @@ mod tests {
         writer.write(&batch).unwrap();
         let file_metadata = writer.close().unwrap();
 
-        assert_eq!(file_metadata.row_groups.len(), 1);
-        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
-        let chunk_meta = file_metadata.row_groups[0].columns[0]
-            .meta_data
-            .as_ref()
-            .expect("column metadata missing");
-        assert!(chunk_meta.encoding_stats.is_some());
-        let chunk_page_stats = chunk_meta.encoding_stats.as_ref().unwrap();
+        assert_eq!(file_metadata.num_row_groups(), 1);
+        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
+        assert!(file_metadata
+            .row_group(0)
+            .column(0)
+            .page_encoding_stats()
+            .is_some());
+        let chunk_page_stats = file_metadata
+            .row_group(0)
+            .column(0)
+            .page_encoding_stats()
+            .unwrap();
 
         // check that the read metadata is also correct
         let options = ReadOptionsBuilder::new().with_page_index().build();
@@ -4349,11 +4356,7 @@ mod tests {
         let column = rowgroup.metadata().column(0);
         assert!(column.page_encoding_stats().is_some());
         let file_page_stats = column.page_encoding_stats().unwrap();
-        let chunk_stats: Vec<PageEncodingStats> = chunk_page_stats
-            .iter()
-            .map(|x| 
crate::file::page_encoding_stats::try_from_thrift(x).unwrap())
-            .collect();
-        assert_eq!(&chunk_stats, file_page_stats);
+        assert_eq!(chunk_page_stats, file_page_stats);
     }
 
     #[test]
diff --git a/parquet/src/arrow/async_writer/mod.rs 
b/parquet/src/arrow/async_writer/mod.rs
index e61b8f47c3..232333a1b4 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -65,7 +65,7 @@ use crate::{
     arrow::ArrowWriter,
     errors::{ParquetError, Result},
     file::{
-        metadata::{KeyValue, RowGroupMetaData},
+        metadata::{KeyValue, ParquetMetaData, RowGroupMetaData},
         properties::WriterProperties,
     },
 };
@@ -247,7 +247,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
     /// Unlike [`Self::close`] this does not consume self
     ///
     /// Attempting to write after calling finish will result in an error
-    pub async fn finish(&mut self) -> Result<crate::format::FileMetaData> {
+    pub async fn finish(&mut self) -> Result<ParquetMetaData> {
         let metadata = self.sync_writer.finish()?;
 
         // Force to flush the remaining data.
@@ -260,7 +260,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
     /// Close and finalize the writer.
     ///
     /// All the data in the inner buffer will be force flushed.
-    pub async fn close(mut self) -> Result<crate::format::FileMetaData> {
+    pub async fn close(mut self) -> Result<ParquetMetaData> {
         self.finish().await
     }
 
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3f516462f2..ee400f200e 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -3602,19 +3602,12 @@ mod tests {
         col_writer.close().unwrap();
         row_group_writer.close().unwrap();
         let file_metadata = writer.close().unwrap();
-        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
-        let stats = file_metadata.row_groups[0].columns[0]
-            .meta_data
-            .as_ref()
-            .unwrap()
-            .statistics
-            .as_ref()
-            .unwrap();
-        assert!(!stats.is_max_value_exact.unwrap());
+        let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
+        assert!(!stats.max_is_exact());
         // Truncation of invalid UTF-8 should fall back to binary truncation, 
so last byte should
         // be incremented by 1.
         assert_eq!(
-            stats.max_value,
+            stats.max_bytes_opt().map(|v| v.to_vec()),
             Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
         );
     }
diff --git a/parquet/src/encryption/encrypt.rs 
b/parquet/src/encryption/encrypt.rs
index 9789302169..1a22abff56 100644
--- a/parquet/src/encryption/encrypt.rs
+++ b/parquet/src/encryption/encrypt.rs
@@ -24,11 +24,9 @@ use crate::errors::{ParquetError, Result};
 use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, 
EncryptionWithColumnKey};
 use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
 use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
-use crate::thrift::TSerializable;
 use ring::rand::{SecureRandom, SystemRandom};
 use std::collections::{HashMap, HashSet};
 use std::io::Write;
-use thrift::protocol::TCompactOutputProtocol;
 
 #[derive(Debug, Clone, PartialEq)]
 struct EncryptionKey {
@@ -365,18 +363,6 @@ impl FileEncryptor {
     }
 }
 
-/// Write an encrypted Thrift serializable object
-pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
-    object: &T,
-    encryptor: &mut Box<dyn BlockEncryptor>,
-    sink: &mut W,
-    module_aad: &[u8],
-) -> Result<()> {
-    let encrypted_buffer = encrypt_object_to_vec(object, encryptor, 
module_aad)?;
-    sink.write_all(&encrypted_buffer)?;
-    Ok(())
-}
-
 /// Write an encrypted Thrift serializable object
 pub(crate) fn encrypt_thrift_object<T: WriteThrift, W: Write>(
     object: &T,
@@ -389,7 +375,7 @@ pub(crate) fn encrypt_thrift_object<T: WriteThrift, W: 
Write>(
     Ok(())
 }
 
-pub(crate) fn write_signed_plaintext_object<T: TSerializable, W: Write>(
+pub(crate) fn write_signed_plaintext_thrift_object<T: WriteThrift, W: Write>(
     object: &T,
     encryptor: &mut Box<dyn BlockEncryptor>,
     sink: &mut W,
@@ -397,8 +383,8 @@ pub(crate) fn write_signed_plaintext_object<T: 
TSerializable, W: Write>(
 ) -> Result<()> {
     let mut buffer: Vec<u8> = vec![];
     {
-        let mut protocol = TCompactOutputProtocol::new(&mut buffer);
-        object.write_to_out_protocol(&mut protocol)?;
+        let mut protocol = ThriftCompactOutputProtocol::new(&mut buffer);
+        object.write_thrift(&mut protocol)?;
     }
     sink.write_all(&buffer)?;
     buffer = encryptor.encrypt(buffer.as_ref(), module_aad)?;
@@ -412,21 +398,6 @@ pub(crate) fn write_signed_plaintext_object<T: 
TSerializable, W: Write>(
     Ok(())
 }
 
-/// Encrypt a Thrift serializable object to a byte vector
-pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
-    object: &T,
-    encryptor: &mut Box<dyn BlockEncryptor>,
-    module_aad: &[u8],
-) -> Result<Vec<u8>> {
-    let mut buffer: Vec<u8> = vec![];
-    {
-        let mut unencrypted_protocol = TCompactOutputProtocol::new(&mut 
buffer);
-        object.write_to_out_protocol(&mut unencrypted_protocol)?;
-    }
-
-    encryptor.encrypt(buffer.as_ref(), module_aad)
-}
-
 /// Encrypt a Thrift serializable object to a byte vector
 pub(crate) fn encrypt_thrift_object_to_vec<T: WriteThrift>(
     object: &T,
diff --git a/parquet/src/file/column_crypto_metadata.rs 
b/parquet/src/file/column_crypto_metadata.rs
index 6a538bd42b..429e7946dd 100644
--- a/parquet/src/file/column_crypto_metadata.rs
+++ b/parquet/src/file/column_crypto_metadata.rs
@@ -20,6 +20,7 @@
 use std::io::Write;
 
 use crate::errors::{ParquetError, Result};
+use crate::file::metadata::HeapSize;
 use crate::format::{
     ColumnCryptoMetaData as TColumnCryptoMetaData,
     EncryptionWithColumnKey as TEncryptionWithColumnKey,
@@ -45,6 +46,12 @@ pub struct EncryptionWithColumnKey {
 }
 );
 
+impl HeapSize for EncryptionWithColumnKey {
+    fn heap_size(&self) -> usize {
+        self.path_in_schema.heap_size() + self.key_metadata.heap_size()
+    }
+}
+
 thrift_union!(
 /// ColumnCryptoMetadata for a column chunk
 union ColumnCryptoMetaData {
@@ -53,6 +60,15 @@ union ColumnCryptoMetaData {
 }
 );
 
+impl HeapSize for ColumnCryptoMetaData {
+    fn heap_size(&self) -> usize {
+        match self {
+            Self::ENCRYPTION_WITH_FOOTER_KEY => 0,
+            Self::ENCRYPTION_WITH_COLUMN_KEY(path) => path.heap_size(),
+        }
+    }
+}
+
 /// Converts Thrift definition into `ColumnCryptoMetadata`.
 pub fn try_from_thrift(
     thrift_column_crypto_metadata: &TColumnCryptoMetaData,
diff --git a/parquet/src/file/metadata/memory.rs 
b/parquet/src/file/metadata/memory.rs
index 19122a1b55..bfe6b0255c 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -94,6 +94,12 @@ impl HeapSize for RowGroupMetaData {
 
 impl HeapSize for ColumnChunkMetaData {
     fn heap_size(&self) -> usize {
+        #[cfg(feature = "encryption")]
+        let encryption_heap_size =
+            self.column_crypto_metadata.heap_size() + 
self.encrypted_column_metadata.heap_size();
+        #[cfg(not(feature = "encryption"))]
+        let encryption_heap_size = 0;
+
         // don't count column_descr here because it is already counted in
         // FileMetaData
         self.encodings.heap_size()
@@ -104,6 +110,7 @@ impl HeapSize for ColumnChunkMetaData {
             + self.unencoded_byte_array_data_bytes.heap_size()
             + self.repetition_level_histogram.heap_size()
             + self.definition_level_histogram.heap_size()
+            + encryption_heap_size
     }
 }
 
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 94e289ae81..22c2f8fb44 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -848,6 +848,8 @@ pub struct ColumnChunkMetaData {
     definition_level_histogram: Option<LevelHistogram>,
     #[cfg(feature = "encryption")]
     column_crypto_metadata: Option<ColumnCryptoMetaData>,
+    #[cfg(feature = "encryption")]
+    encrypted_column_metadata: Option<Vec<u8>>,
 }
 
 /// Histograms for repetition and definition levels.
@@ -1232,6 +1234,8 @@ impl ColumnChunkMetaData {
             definition_level_histogram,
             #[cfg(feature = "encryption")]
             column_crypto_metadata,
+            #[cfg(feature = "encryption")]
+            encrypted_column_metadata: None,
         };
         Ok(result)
     }
@@ -1370,6 +1374,8 @@ impl ColumnChunkMetaDataBuilder {
             definition_level_histogram: None,
             #[cfg(feature = "encryption")]
             column_crypto_metadata: None,
+            #[cfg(feature = "encryption")]
+            encrypted_column_metadata: None,
         })
     }
 
@@ -2067,7 +2073,7 @@ mod tests {
         #[cfg(not(feature = "encryption"))]
         let base_expected_size = 2280;
         #[cfg(feature = "encryption")]
-        let base_expected_size = 2616;
+        let base_expected_size = 2712;
 
         assert_eq!(parquet_meta.memory_size(), base_expected_size);
 
@@ -2107,7 +2113,7 @@ mod tests {
         #[cfg(not(feature = "encryption"))]
         let bigger_expected_size = 2704;
         #[cfg(feature = "encryption")]
-        let bigger_expected_size = 3040;
+        let bigger_expected_size = 3136;
 
         // more set fields means more memory usage
         assert!(bigger_expected_size > base_expected_size);
diff --git a/parquet/src/file/metadata/thrift_gen.rs 
b/parquet/src/file/metadata/thrift_gen.rs
index 7515a70a63..5665ad2ce9 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift_gen.rs
@@ -38,7 +38,9 @@ use crate::{
         read_thrift_vec, ElementType, FieldType, ReadThrift, 
ThriftCompactInputProtocol,
         ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
     },
-    schema::types::{parquet_schema_from_array, ColumnDescriptor, 
SchemaDescriptor},
+    schema::types::{
+        num_nodes, parquet_schema_from_array, ColumnDescriptor, 
SchemaDescriptor, TypePtr,
+    },
     thrift_struct, thrift_union,
     util::bit_util::FromBytes,
 };
@@ -68,12 +70,12 @@ pub(crate) struct SchemaElement<'a> {
 );
 
 thrift_struct!(
-pub(crate) struct AesGcmV1<'a> {
+pub(crate) struct AesGcmV1 {
   /// AAD prefix
-  1: optional binary<'a> aad_prefix
+  1: optional binary aad_prefix
 
   /// Unique file identifier part of AAD suffix
-  2: optional binary<'a> aad_file_unique
+  2: optional binary aad_file_unique
 
   /// In files encrypted with AAD prefix without storing it,
   /// readers must supply the prefix
@@ -82,12 +84,12 @@ pub(crate) struct AesGcmV1<'a> {
 );
 
 thrift_struct!(
-pub(crate) struct AesGcmCtrV1<'a> {
+pub(crate) struct AesGcmCtrV1 {
   /// AAD prefix
-  1: optional binary<'a> aad_prefix
+  1: optional binary aad_prefix
 
   /// Unique file identifier part of AAD suffix
-  2: optional binary<'a> aad_file_unique
+  2: optional binary aad_file_unique
 
   /// In files encrypted with AAD prefix without storing it,
   /// readers must supply the prefix
@@ -96,24 +98,24 @@ pub(crate) struct AesGcmCtrV1<'a> {
 );
 
 thrift_union!(
-union EncryptionAlgorithm<'a> {
-  1: (AesGcmV1<'a>) AES_GCM_V1
-  2: (AesGcmCtrV1<'a>) AES_GCM_CTR_V1
+union EncryptionAlgorithm {
+  1: (AesGcmV1) AES_GCM_V1
+  2: (AesGcmCtrV1) AES_GCM_CTR_V1
 }
 );
 
 #[cfg(feature = "encryption")]
 thrift_struct!(
 /// Crypto metadata for files with encrypted footer
-pub(crate) struct FileCryptoMetaData<'a> {
+pub(crate) struct FileCryptoMetaData {
   /// Encryption algorithm. This field is only used for files
   /// with encrypted footer. Files with plaintext footer store algorithm id
   /// inside footer (FileMetaData structure).
-  1: required EncryptionAlgorithm<'a> encryption_algorithm
+  1: required EncryptionAlgorithm encryption_algorithm
 
   /** Retrieval metadata of key used for encryption of footer,
    *  and (possibly) columns **/
-  2: optional binary<'a> key_metadata
+  2: optional binary key_metadata
 }
 );
 
@@ -135,8 +137,8 @@ struct FileMetaData<'a> {
   5: optional list<KeyValue> key_value_metadata
   6: optional string created_by
   7: optional list<ColumnOrder> column_orders;
-  8: optional EncryptionAlgorithm<'a> encryption_algorithm
-  9: optional binary<'a> footer_signing_key_metadata
+  8: optional EncryptionAlgorithm encryption_algorithm
+  9: optional binary footer_signing_key_metadata
 }
 );
 
@@ -337,8 +339,6 @@ fn convert_column(
     let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
     let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
 
-    // FIXME: need column crypto
-
     let result = ColumnChunkMetaData {
         column_descr,
         encodings,
@@ -364,6 +364,8 @@ fn convert_column(
         definition_level_histogram,
         #[cfg(feature = "encryption")]
         column_crypto_metadata: column.crypto_metadata,
+        #[cfg(feature = "encryption")]
+        encrypted_column_metadata: None,
     };
     Ok(result)
 }
@@ -632,7 +634,7 @@ pub(crate) fn parquet_metadata_with_encryption(
             }
             let decryptor = get_file_decryptor(
                 t_file_crypto_metadata.encryption_algorithm,
-                t_file_crypto_metadata.key_metadata,
+                t_file_crypto_metadata.key_metadata.as_ref(),
                 file_decryption_properties,
             )?;
             let footer_decryptor = decryptor.get_footer_decryptor();
@@ -672,7 +674,7 @@ pub(crate) fn parquet_metadata_with_encryption(
         // File has a plaintext footer but encryption algorithm is set
         let file_decryptor_value = get_file_decryptor(
             algo,
-            file_meta.footer_signing_key_metadata,
+            file_meta.footer_signing_key_metadata.as_ref(),
             file_decryption_properties,
         )?;
         if file_decryption_properties.check_plaintext_footer_integrity() && 
!encrypted_footer {
@@ -733,7 +735,7 @@ pub(crate) fn parquet_metadata_with_encryption(
 #[cfg(feature = "encryption")]
 pub(super) fn get_file_decryptor(
     encryption_algorithm: EncryptionAlgorithm,
-    footer_key_metadata: Option<&[u8]>,
+    footer_key_metadata: Option<&Vec<u8>>,
     file_decryption_properties: &FileDecryptionProperties,
 ) -> Result<FileDecryptor> {
     match encryption_algorithm {
@@ -750,7 +752,7 @@ pub(super) fn get_file_decryptor(
 
             FileDecryptor::new(
                 file_decryption_properties,
-                footer_key_metadata,
+                footer_key_metadata.map(|v| v.as_slice()),
                 aad_file_unique,
                 aad_prefix,
             )
@@ -1158,6 +1160,335 @@ impl PageHeader {
     }
 }
 
+/////////////////////////////////////////////////
+// helper functions for writing file meta data
+
+// serialize the bits of the column chunk needed for a thrift ColumnMetaData
+// struct ColumnMetaData {
+//   1: required Type type
+//   2: required list<Encoding> encodings
+//   3: required list<string> path_in_schema
+//   4: required CompressionCodec codec
+//   5: required i64 num_values
+//   6: required i64 total_uncompressed_size
+//   7: required i64 total_compressed_size
+//   8: optional list<KeyValue> key_value_metadata
+//   9: required i64 data_page_offset
+//   10: optional i64 index_page_offset
+//   11: optional i64 dictionary_page_offset
+//   12: optional Statistics statistics;
+//   13: optional list<PageEncodingStats> encoding_stats;
+//   14: optional i64 bloom_filter_offset;
+//   15: optional i32 bloom_filter_length;
+//   16: optional SizeStatistics size_statistics;
+//   17: optional GeospatialStatistics geospatial_statistics;
+// }
+pub(crate) fn serialize_column_meta_data<W: Write>(
+    column_chunk: &ColumnChunkMetaData,
+    w: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    use crate::file::statistics::page_stats_to_thrift;
+
+    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
+    column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+    let path = column_chunk.column_descr.path().parts();
+    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
+    path.write_thrift_field(w, 3, 2)?;
+    column_chunk.compression.write_thrift_field(w, 4, 3)?;
+    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
+    column_chunk
+        .total_uncompressed_size
+        .write_thrift_field(w, 6, 5)?;
+    column_chunk
+        .total_compressed_size
+        .write_thrift_field(w, 7, 6)?;
+    // no key_value_metadata here
+    let mut last_field_id = 
column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
+    if let Some(index_page_offset) = column_chunk.index_page_offset {
+        last_field_id = index_page_offset.write_thrift_field(w, 10, 
last_field_id)?;
+    }
+    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
+        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, 
last_field_id)?;
+    }
+    // PageStatistics is the same as thrift Statistics, but writable
+    let stats = page_stats_to_thrift(column_chunk.statistics());
+    if let Some(stats) = stats {
+        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+    }
+    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+        last_field_id = page_encoding_stats.write_thrift_field(w, 13, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+        last_field_id = bloom_filter_length.write_thrift_field(w, 15, 
last_field_id)?;
+    }
+
+    // SizeStatistics
+    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
+        || column_chunk.repetition_level_histogram.is_some()
+        || column_chunk.definition_level_histogram.is_some()
+    {
+        let repetition_level_histogram = column_chunk
+            .repetition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        let definition_level_histogram = column_chunk
+            .definition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        Some(SizeStatistics {
+            unencoded_byte_array_data_bytes: 
column_chunk.unencoded_byte_array_data_bytes,
+            repetition_level_histogram,
+            definition_level_histogram,
+        })
+    } else {
+        None
+    };
+    if let Some(size_stats) = size_stats {
+        size_stats.write_thrift_field(w, 16, last_field_id)?;
+    }
+
+    // TODO: field 17 geo spatial stats here
+    w.write_struct_end()
+}
+
+// temp struct used for writing
+pub(crate) struct FileMeta<'a> {
+    pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
+    pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+    pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
+    pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
+}
+
+impl<'a> WriteThrift for FileMeta<'a> {
+    const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+    fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
+        self.file_metadata
+            .version
+            .write_thrift_field(writer, 1, 0)?;
+
+        // field 2 is schema. do depth-first traversal of tree, converting to 
SchemaElement and
+        // writing along the way.
+        let root = self.file_metadata.schema_descr().root_schema_ptr();
+        let schema_len = num_nodes(&root);
+        writer.write_field_begin(FieldType::List, 2, 1)?;
+        writer.write_list_begin(ElementType::Struct, schema_len)?;
+        // recursively write Type nodes as SchemaElements
+        write_schema(&root, writer)?;
+
+        self.file_metadata
+            .num_rows
+            .write_thrift_field(writer, 3, 2)?;
+
+        // this will call RowGroupMetaData::write_thrift
+        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 
3)?;
+
+        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
+            last_field_id = kv_metadata.write_thrift_field(writer, 5, 
last_field_id)?;
+        }
+        if let Some(created_by) = self.file_metadata.created_by() {
+            last_field_id = created_by.write_thrift_field(writer, 6, 
last_field_id)?;
+        }
+        if let Some(column_orders) = self.file_metadata.column_orders() {
+            last_field_id = column_orders.write_thrift_field(writer, 7, 
last_field_id)?;
+        }
+        if let Some(algo) = self.encryption_algorithm.as_ref() {
+            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
+        }
+        if let Some(key) = self.footer_signing_key_metadata.as_ref() {
+            key.as_slice()
+                .write_thrift_field(writer, 9, last_field_id)?;
+        }
+
+        writer.write_struct_end()
+    }
+}
+
+fn write_schema<W: Write>(
+    node: &TypePtr,
+    writer: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    match node.as_ref() {
+        crate::schema::types::Type::PrimitiveType {
+            basic_info,
+            physical_type,
+            type_length,
+            scale,
+            precision,
+        } => {
+            let element = SchemaElement {
+                type_: Some(*physical_type),
+                type_length: if *type_length >= 0 {
+                    Some(*type_length)
+                } else {
+                    None
+                },
+                repetition_type: Some(basic_info.repetition()),
+                name: basic_info.name(),
+                num_children: None,
+                converted_type: match basic_info.converted_type() {
+                    ConvertedType::NONE => None,
+                    other => Some(other),
+                },
+                scale: if *scale >= 0 { Some(*scale) } else { None },
+                precision: if *precision >= 0 {
+                    Some(*precision)
+                } else {
+                    None
+                },
+                field_id: if basic_info.has_id() {
+                    Some(basic_info.id())
+                } else {
+                    None
+                },
+                logical_type: basic_info.logical_type(),
+            };
+            element.write_thrift(writer)
+        }
+        crate::schema::types::Type::GroupType { basic_info, fields } => {
+            let repetition = if basic_info.has_repetition() {
+                Some(basic_info.repetition())
+            } else {
+                None
+            };
+
+            let element = SchemaElement {
+                type_: None,
+                type_length: None,
+                repetition_type: repetition,
+                name: basic_info.name(),
+                num_children: Some(fields.len().try_into()?),
+                converted_type: match basic_info.converted_type() {
+                    ConvertedType::NONE => None,
+                    other => Some(other),
+                },
+                scale: None,
+                precision: None,
+                field_id: if basic_info.has_id() {
+                    Some(basic_info.id())
+                } else {
+                    None
+                },
+                logical_type: basic_info.logical_type(),
+            };
+
+            element.write_thrift(writer)?;
+
+            // Add child elements for a group
+            for field in fields {
+                write_schema(field, writer)?;
+            }
+            Ok(())
+        }
+    }
+}
+
+// struct RowGroup {
+//   1: required list<ColumnChunk> columns
+//   2: required i64 total_byte_size
+//   3: required i64 num_rows
+//   4: optional list<SortingColumn> sorting_columns
+//   5: optional i64 file_offset
+//   6: optional i64 total_compressed_size
+//   7: optional i16 ordinal
+// }
+impl WriteThrift for RowGroupMetaData {
+    const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+    fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
+        // this will call ColumnChunkMetaData::write_thrift
+        self.columns.write_thrift_field(writer, 1, 0)?;
+        self.total_byte_size.write_thrift_field(writer, 2, 1)?;
+        let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 
2)?;
+        if let Some(sorting_columns) = self.sorting_columns() {
+            last_field_id = sorting_columns.write_thrift_field(writer, 4, 
last_field_id)?;
+        }
+        if let Some(file_offset) = self.file_offset() {
+            last_field_id = file_offset.write_thrift_field(writer, 5, 
last_field_id)?;
+        }
+        // this is optional, but we'll always write it
+        last_field_id = self
+            .compressed_size()
+            .write_thrift_field(writer, 6, last_field_id)?;
+        if let Some(ordinal) = self.ordinal() {
+            ordinal.write_thrift_field(writer, 7, last_field_id)?;
+        }
+        writer.write_struct_end()
+    }
+}
+
+// struct ColumnChunk {
+//   1: optional string file_path
+//   2: required i64 file_offset = 0
+//   3: optional ColumnMetaData meta_data
+//   4: optional i64 offset_index_offset
+//   5: optional i32 offset_index_length
+//   6: optional i64 column_index_offset
+//   7: optional i32 column_index_length
+//   8: optional ColumnCryptoMetaData crypto_metadata
+//   9: optional binary encrypted_column_metadata
+// }
+impl WriteThrift for ColumnChunkMetaData {
+    const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+    #[allow(unused_assignments)]
+    fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
+        let mut last_field_id = 0i16;
+        if let Some(file_path) = self.file_path() {
+            last_field_id = file_path.write_thrift_field(writer, 1, 
last_field_id)?;
+        }
+        last_field_id = self
+            .file_offset()
+            .write_thrift_field(writer, 2, last_field_id)?;
+
+        #[cfg(feature = "encryption")]
+        {
+            // only write the ColumnMetaData if we haven't already encrypted it
+            if self.encrypted_column_metadata.is_none() {
+                writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
+                serialize_column_meta_data(self, writer)?;
+                last_field_id = 3;
+            }
+        }
+        #[cfg(not(feature = "encryption"))]
+        {
+            // always write the ColumnMetaData
+            writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
+            serialize_column_meta_data(self, writer)?;
+            last_field_id = 3;
+        }
+
+        if let Some(offset_idx_off) = self.offset_index_offset() {
+            last_field_id = offset_idx_off.write_thrift_field(writer, 4, 
last_field_id)?;
+        }
+        if let Some(offset_idx_len) = self.offset_index_length() {
+            last_field_id = offset_idx_len.write_thrift_field(writer, 5, 
last_field_id)?;
+        }
+        if let Some(column_idx_off) = self.column_index_offset() {
+            last_field_id = column_idx_off.write_thrift_field(writer, 6, 
last_field_id)?;
+        }
+        if let Some(column_idx_len) = self.column_index_length() {
+            last_field_id = column_idx_len.write_thrift_field(writer, 7, 
last_field_id)?;
+        }
+        #[cfg(feature = "encryption")]
+        {
+            if let Some(crypto_metadata) = self.crypto_metadata() {
+                last_field_id = crypto_metadata.write_thrift_field(writer, 8, 
last_field_id)?;
+            }
+            if let Some(encrypted_meta) = 
self.encrypted_column_metadata.as_ref() {
+                encrypted_meta
+                    .as_slice()
+                    .write_thrift_field(writer, 9, last_field_id)?;
+            }
+        }
+
+        writer.write_struct_end()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::file::metadata::thrift_gen::BoundingBox;
diff --git a/parquet/src/file/metadata/writer.rs 
b/parquet/src/file/metadata/writer.rs
index a09a703ade..6396e454fb 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -15,22 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::file::metadata::thrift_gen::{EncryptionAlgorithm, FileMeta};
+use crate::file::metadata::{
+    ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, 
RowGroupMetaData,
+};
+use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
+use crate::{
+    basic::ColumnOrder,
+    file::metadata::{FileMetaData, ParquetMetaDataBuilder},
+};
 #[cfg(feature = "encryption")]
-use crate::encryption::{
-    encrypt::{
-        encrypt_object, encrypt_object_to_vec, write_signed_plaintext_object, 
FileEncryptor,
+use crate::{
+    encryption::{
+        encrypt::{encrypt_thrift_object, write_signed_plaintext_thrift_object, 
FileEncryptor},
+        modules::{create_footer_aad, create_module_aad, ModuleType},
     },
-    modules::{create_footer_aad, create_module_aad, ModuleType},
+    file::column_crypto_metadata::ColumnCryptoMetaData,
+    file::metadata::thrift_gen::{AesGcmV1, FileCryptoMetaData},
 };
-#[cfg(feature = "encryption")]
-use crate::errors::ParquetError;
-use crate::format::EncryptionAlgorithm;
-#[cfg(feature = "encryption")]
-use crate::format::{AesGcmV1, ColumnCryptoMetaData};
-use crate::schema::types;
-use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
-use crate::thrift::TSerializable;
 use crate::{errors::Result, 
file::page_index::column_index::ColumnIndexMetaData};
+
 use crate::{
     file::writer::{get_file_magic, TrackedWrite},
     parquet_thrift::WriteThrift,
@@ -44,18 +48,16 @@ use crate::{
 };
 use std::io::Write;
 use std::sync::Arc;
-use thrift::protocol::TCompactOutputProtocol;
 
 /// Writes `crate::file::metadata` structures to a thrift encoded byte stream
 ///
 /// See [`ParquetMetaDataWriter`] for background and example.
 pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
     buf: &'a mut TrackedWrite<W>,
-    schema: &'a TypePtr,
     schema_descr: &'a SchemaDescPtr,
-    row_groups: Vec<crate::format::RowGroup>,
-    column_indexes: Option<&'a [Vec<Option<ColumnIndexMetaData>>]>,
-    offset_indexes: Option<&'a [Vec<Option<OffsetIndexMetaData>>]>,
+    row_groups: Vec<RowGroupMetaData>,
+    column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
+    offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
     key_value_metadata: Option<Vec<KeyValue>>,
     created_by: Option<String>,
     object_writer: MetadataObjectWriter,
@@ -130,14 +132,17 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
     }
 
     /// Assembles and writes the final metadata to self.buf
-    pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
+    pub fn finish(mut self) -> Result<ParquetMetaData> {
         let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
 
+        let column_indexes = std::mem::take(&mut self.column_indexes);
+        let offset_indexes = std::mem::take(&mut self.offset_indexes);
+
         // Write column indexes and offset indexes
-        if let Some(column_indexes) = self.column_indexes {
+        if let Some(column_indexes) = column_indexes.as_ref() {
             self.write_column_indexes(column_indexes)?;
         }
-        if let Some(offset_indexes) = self.offset_indexes {
+        if let Some(offset_indexes) = offset_indexes.as_ref() {
             self.write_offset_indexes(offset_indexes)?;
         }
 
@@ -146,32 +151,44 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
         // for all leaf nodes.
         // Even if the column has an undefined sort order, such as INTERVAL, 
this
         // is still technically the defined TYPEORDER so it should still be 
set.
-        let column_orders = (0..self.schema_descr.num_columns())
-            .map(|_| 
crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
+        let column_orders = self
+            .schema_descr
+            .columns()
+            .iter()
+            .map(|col| {
+                let sort_order = ColumnOrder::get_sort_order(
+                    col.logical_type(),
+                    col.converted_type(),
+                    col.physical_type(),
+                );
+                ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
+            })
             .collect();
+
         // This field is optional, perhaps in cases where no min/max fields 
are set
         // in any Statistics or ColumnIndex object in the whole file.
         // But for simplicity we always set this field.
         let column_orders = Some(column_orders);
+
         let (row_groups, unencrypted_row_groups) = self
             .object_writer
             .apply_row_group_encryption(self.row_groups)?;
 
         let (encryption_algorithm, footer_signing_key_metadata) =
             self.object_writer.get_plaintext_footer_crypto_metadata();
-        let key_value_metadata = self.key_value_metadata.map(|vkv| {
-            vkv.into_iter()
-                .map(|kv| crate::format::KeyValue::new(kv.key, kv.value))
-                .collect::<Vec<crate::format::KeyValue>>()
-        });
-        let mut file_metadata = crate::format::FileMetaData {
+
+        let file_metadata = FileMetaData::new(
+            self.writer_version,
             num_rows,
-            row_groups,
-            key_value_metadata,
-            version: self.writer_version,
-            schema: types::to_thrift(self.schema.as_ref())?,
-            created_by: self.created_by.clone(),
+            self.created_by,
+            self.key_value_metadata,
+            self.schema_descr.clone(),
             column_orders,
+        );
+
+        let file_meta = FileMeta {
+            file_metadata: &file_metadata,
+            row_groups: &row_groups,
             encryption_algorithm,
             footer_signing_key_metadata,
         };
@@ -179,7 +196,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
         // Write file metadata
         let start_pos = self.buf.bytes_written();
         self.object_writer
-            .write_file_metadata(&file_metadata, &mut self.buf)?;
+            .write_file_metadata(&file_meta, &mut self.buf)?;
         let end_pos = self.buf.bytes_written();
 
         // Write footer
@@ -188,28 +205,49 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
         self.buf.write_all(&metadata_len.to_le_bytes())?;
         self.buf.write_all(self.object_writer.get_file_magic())?;
 
-        if let Some(row_groups) = unencrypted_row_groups {
-            // If row group metadata was encrypted, we replace the encrypted 
row groups with
-            // unencrypted metadata before it is returned to users. This 
allows the metadata
-            // to be usable for retrieving the row group statistics for 
example, without users
-            // needing to decrypt the metadata.
-            file_metadata.row_groups = row_groups;
-        }
+        // If row group metadata was encrypted, we replace the encrypted row 
groups with
+        // unencrypted metadata before it is returned to users. This allows 
the metadata
+        // to be usable for retrieving the row group statistics for example, 
without users
+        // needing to decrypt the metadata.
+        let mut builder = ParquetMetaDataBuilder::new(file_metadata);
+
+        builder = match unencrypted_row_groups {
+            Some(rg) => builder.set_row_groups(rg),
+            None => builder.set_row_groups(row_groups),
+        };
+
+        let column_indexes: Option<ParquetColumnIndex> = 
column_indexes.map(|ovvi| {
+            ovvi.into_iter()
+                .map(|vi| {
+                    vi.into_iter()
+                        .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE))
+                        .collect()
+                })
+                .collect()
+        });
+
+        // FIXME(ets): this will panic if there's a missing index.
+        let offset_indexes: Option<ParquetOffsetIndex> = 
offset_indexes.map(|ovvi| {
+            ovvi.into_iter()
+                .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
+                .collect()
+        });
 
-        Ok(file_metadata)
+        builder = builder.set_column_index(column_indexes);
+        builder = builder.set_offset_index(offset_indexes);
+
+        Ok(builder.build())
     }
 
     pub fn new(
         buf: &'a mut TrackedWrite<W>,
-        schema: &'a TypePtr,
         schema_descr: &'a SchemaDescPtr,
-        row_groups: Vec<crate::format::RowGroup>,
+        row_groups: Vec<RowGroupMetaData>,
         created_by: Option<String>,
         writer_version: i32,
     ) -> Self {
         Self {
             buf,
-            schema,
             schema_descr,
             row_groups,
             column_indexes: None,
@@ -223,7 +261,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
 
     pub fn with_column_indexes(
         mut self,
-        column_indexes: &'a [Vec<Option<ColumnIndexMetaData>>],
+        column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
     ) -> Self {
         self.column_indexes = Some(column_indexes);
         self
@@ -231,7 +269,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
 
     pub fn with_offset_indexes(
         mut self,
-        offset_indexes: &'a [Vec<Option<OffsetIndexMetaData>>],
+        offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
     ) -> Self {
         self.offset_indexes = Some(offset_indexes);
         self
@@ -361,12 +399,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
         let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
         let created_by = file_metadata.created_by().map(str::to_string);
 
-        let row_groups = self
-            .metadata
-            .row_groups()
-            .iter()
-            .map(|rg| rg.to_thrift())
-            .collect::<Vec<_>>();
+        let row_groups = self.metadata.row_groups.clone();
 
         let key_value_metadata = file_metadata.key_value_metadata().cloned();
 
@@ -375,14 +408,20 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
 
         let mut encoder = ThriftMetadataWriter::new(
             &mut self.buf,
-            &schema,
             &schema_descr,
             row_groups,
             created_by,
             file_metadata.version(),
         );
-        encoder = encoder.with_column_indexes(&column_indexes);
-        encoder = encoder.with_offset_indexes(&offset_indexes);
+
+        if let Some(column_indexes) = column_indexes {
+            encoder = encoder.with_column_indexes(column_indexes);
+        }
+
+        if let Some(offset_indexes) = offset_indexes {
+            encoder = encoder.with_offset_indexes(offset_indexes);
+        }
+
         if let Some(key_value_metadata) = key_value_metadata {
             encoder = encoder.with_key_value_metadata(key_value_metadata);
         }
@@ -391,46 +430,38 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
         Ok(())
     }
 
-    fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndexMetaData>>> {
-        if let Some(row_group_column_indexes) = self.metadata.column_index() {
-            (0..self.metadata.row_groups().len())
-                .map(|rg_idx| {
-                    let column_indexes = &row_group_column_indexes[rg_idx];
-                    column_indexes
-                        .iter()
-                        .map(|column_index| Some(column_index.clone()))
-                        .collect()
-                })
-                .collect()
-        } else {
-            // make a None for each row group, for each column
-            self.metadata
-                .row_groups()
-                .iter()
-                .map(|rg| std::iter::repeat_n(None, 
rg.columns().len()).collect())
-                .collect()
-        }
+    fn convert_column_indexes(&self) -> 
Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
+        // FIXME(ets): we're converting from ParquetColumnIndex to 
vec<vec<option>>,
+        // but then converting back to ParquetColumnIndex in the end. need to 
unify this.
+        self.metadata
+            .column_index()
+            .map(|row_group_column_indexes| {
+                (0..self.metadata.row_groups().len())
+                    .map(|rg_idx| {
+                        let column_indexes = &row_group_column_indexes[rg_idx];
+                        column_indexes
+                            .iter()
+                            .map(|column_index| Some(column_index.clone()))
+                            .collect()
+                    })
+                    .collect()
+            })
     }
 
-    fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndexMetaData>>> {
-        if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
-            (0..self.metadata.row_groups().len())
-                .map(|rg_idx| {
-                    let offset_indexes = &row_group_offset_indexes[rg_idx];
-                    offset_indexes
-                        .iter()
-                        .map(|offset_index| Some(offset_index.clone()))
-                        .collect()
-                })
-                .collect()
-        } else {
-            // make a None for each row group, for each column
-            self.metadata
-                .row_groups()
-                .iter()
-                .map(|rg| std::iter::repeat_n(None, 
rg.columns().len()).collect())
-                .collect()
-        }
+    fn convert_offset_index(&self) -> 
Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
+        self.metadata
+            .offset_index()
+            .map(|row_group_offset_indexes| {
+                (0..self.metadata.row_groups().len())
+                    .map(|rg_idx| {
+                        let offset_indexes = &row_group_offset_indexes[rg_idx];
+                        offset_indexes
+                            .iter()
+                            .map(|offset_index| Some(offset_index.clone()))
+                            .collect()
+                    })
+                    .collect()
+            })
     }
 }
 
@@ -441,13 +472,6 @@ struct MetadataObjectWriter {
 }
 
 impl MetadataObjectWriter {
-    #[inline]
-    fn write_object(object: &impl TSerializable, sink: impl Write) -> 
Result<()> {
-        let mut protocol = TCompactOutputProtocol::new(sink);
-        object.write_to_out_protocol(&mut protocol)?;
-        Ok(())
-    }
-
     #[inline]
     fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> 
Result<()> {
         let mut protocol = ThriftCompactOutputProtocol::new(sink);
@@ -460,19 +484,15 @@ impl MetadataObjectWriter {
 #[cfg(not(feature = "encryption"))]
 impl MetadataObjectWriter {
     /// Write [`FileMetaData`] in Thrift format
-    fn write_file_metadata(
-        &self,
-        file_metadata: &crate::format::FileMetaData,
-        sink: impl Write,
-    ) -> Result<()> {
-        Self::write_object(file_metadata, sink)
+    fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) 
-> Result<()> {
+        Self::write_thrift_object(file_metadata, sink)
     }
 
     /// Write a column [`OffsetIndex`] in Thrift format
     fn write_offset_index(
         &self,
         offset_index: &OffsetIndexMetaData,
-        _column_chunk: &crate::format::ColumnChunk,
+        _column_chunk: &ColumnChunkMetaData,
         _row_group_idx: usize,
         _column_idx: usize,
         sink: impl Write,
@@ -484,7 +504,7 @@ impl MetadataObjectWriter {
     fn write_column_index(
         &self,
         column_index: &ColumnIndexMetaData,
-        _column_chunk: &crate::format::ColumnChunk,
+        _column_chunk: &ColumnChunkMetaData,
         _row_group_idx: usize,
         _column_idx: usize,
         sink: impl Write,
@@ -495,11 +515,8 @@ impl MetadataObjectWriter {
     /// No-op implementation of row-group metadata encryption
     fn apply_row_group_encryption(
         &self,
-        row_groups: Vec<crate::format::RowGroup>,
-    ) -> Result<(
-        Vec<crate::format::RowGroup>,
-        Option<Vec<crate::format::RowGroup>>,
-    )> {
+        row_groups: Vec<RowGroupMetaData>,
+    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
         Ok((row_groups, None))
     }
 
@@ -527,29 +544,25 @@ impl MetadataObjectWriter {
     /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if 
required
     ///
     /// [`FileMetaData`]: crate::format::FileMetaData
-    fn write_file_metadata(
-        &self,
-        file_metadata: &crate::format::FileMetaData,
-        mut sink: impl Write,
-    ) -> Result<()> {
+    fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl 
Write) -> Result<()> {
         match self.file_encryptor.as_ref() {
             Some(file_encryptor) if 
file_encryptor.properties().encrypt_footer() => {
                 // First write FileCryptoMetadata
                 let crypto_metadata = 
Self::file_crypto_metadata(file_encryptor)?;
-                let mut protocol = TCompactOutputProtocol::new(&mut sink);
-                crypto_metadata.write_to_out_protocol(&mut protocol)?;
+                let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
+                crypto_metadata.write_thrift(&mut protocol)?;
 
                 // Then write encrypted footer
                 let aad = create_footer_aad(file_encryptor.file_aad())?;
                 let mut encryptor = file_encryptor.get_footer_encryptor()?;
-                encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
+                encrypt_thrift_object(file_metadata, &mut encryptor, &mut 
sink, &aad)
             }
             Some(file_encryptor) if 
file_metadata.encryption_algorithm.is_some() => {
                 let aad = create_footer_aad(file_encryptor.file_aad())?;
                 let mut encryptor = file_encryptor.get_footer_encryptor()?;
-                write_signed_plaintext_object(file_metadata, &mut encryptor, 
&mut sink, &aad)
+                write_signed_plaintext_thrift_object(file_metadata, &mut 
encryptor, &mut sink, &aad)
             }
-            _ => Self::write_object(file_metadata, &mut sink),
+            _ => Self::write_thrift_object(file_metadata, &mut sink),
         }
     }
 
@@ -559,7 +572,7 @@ impl MetadataObjectWriter {
     fn write_offset_index(
         &self,
         offset_index: &OffsetIndexMetaData,
-        column_chunk: &crate::format::ColumnChunk,
+        column_chunk: &ColumnChunkMetaData,
         row_group_idx: usize,
         column_idx: usize,
         sink: impl Write,
@@ -584,7 +597,7 @@ impl MetadataObjectWriter {
     fn write_column_index(
         &self,
         column_index: &ColumnIndexMetaData,
-        column_chunk: &crate::format::ColumnChunk,
+        column_chunk: &ColumnChunkMetaData,
         row_group_idx: usize,
         column_idx: usize,
         sink: impl Write,
@@ -608,11 +621,8 @@ impl MetadataObjectWriter {
     /// and possibly unencrypted metadata to be returned to clients if data 
was encrypted.
     fn apply_row_group_encryption(
         &self,
-        row_groups: Vec<crate::format::RowGroup>,
-    ) -> Result<(
-        Vec<crate::format::RowGroup>,
-        Option<Vec<crate::format::RowGroup>>,
-    )> {
+        row_groups: Vec<RowGroupMetaData>,
+    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
         match &self.file_encryptor {
             Some(file_encryptor) => {
                 let unencrypted_row_groups = row_groups.clone();
@@ -636,21 +646,12 @@ impl MetadataObjectWriter {
         object: &impl WriteThrift,
         mut sink: impl Write,
         file_encryptor: &FileEncryptor,
-        column_metadata: &crate::format::ColumnChunk,
+        column_metadata: &ColumnChunkMetaData,
         module_type: ModuleType,
         row_group_index: usize,
         column_index: usize,
     ) -> Result<()> {
-        let column_path_vec = &column_metadata
-            .meta_data
-            .as_ref()
-            .ok_or_else(|| {
-                general_err!(
-                    "Column metadata not set for column {} when encrypting 
object",
-                    column_index
-                )
-            })?
-            .path_in_schema;
+        let column_path_vec = column_metadata.column_path().as_ref();
 
         let joined_column_path;
         let column_path = if column_path_vec.len() == 1 {
@@ -699,36 +700,34 @@ impl MetadataObjectWriter {
             .aad_prefix()
             .map(|_| !file_encryptor.properties().store_aad_prefix());
         let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
-            file_encryptor.properties().aad_prefix().cloned()
+            file_encryptor.properties().aad_prefix()
         } else {
             None
         };
-        EncryptionAlgorithm::AESGCMV1(AesGcmV1 {
-            aad_prefix,
+        EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
+            aad_prefix: aad_prefix.cloned(),
             aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
             supply_aad_prefix,
         })
     }
 
-    fn file_crypto_metadata(
-        file_encryptor: &FileEncryptor,
-    ) -> Result<crate::format::FileCryptoMetaData> {
+    fn file_crypto_metadata(file_encryptor: &FileEncryptor) -> 
Result<FileCryptoMetaData> {
         let properties = file_encryptor.properties();
-        Ok(crate::format::FileCryptoMetaData {
+        Ok(FileCryptoMetaData {
             encryption_algorithm: 
Self::encryption_algorithm_from_encryptor(file_encryptor),
             key_metadata: properties.footer_key_metadata().cloned(),
         })
     }
 
     fn encrypt_row_groups(
-        row_groups: Vec<crate::format::RowGroup>,
+        row_groups: Vec<RowGroupMetaData>,
         file_encryptor: &Arc<FileEncryptor>,
-    ) -> Result<Vec<crate::format::RowGroup>> {
+    ) -> Result<Vec<RowGroupMetaData>> {
         row_groups
             .into_iter()
             .enumerate()
             .map(|(rg_idx, mut rg)| {
-                let cols: Result<Vec<crate::format::ColumnChunk>> = rg
+                let cols: Result<Vec<ColumnChunkMetaData>> = rg
                     .columns
                     .into_iter()
                     .enumerate()
@@ -744,26 +743,24 @@ impl MetadataObjectWriter {
 
     /// Apply column encryption to column chunk metadata
     fn encrypt_column_chunk(
-        mut column_chunk: crate::format::ColumnChunk,
+        mut column_chunk: ColumnChunkMetaData,
         file_encryptor: &Arc<FileEncryptor>,
         row_group_index: usize,
         column_index: usize,
-    ) -> Result<crate::format::ColumnChunk> {
+    ) -> Result<ColumnChunkMetaData> {
         // Column crypto metadata should have already been set when the column 
was created.
         // Here we apply the encryption by encrypting the column metadata if 
required.
-        match column_chunk.crypto_metadata.as_ref() {
+        match column_chunk.column_crypto_metadata.as_ref() {
             None => {}
-            Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
+            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
                 // When uniform encryption is used the footer is already 
encrypted,
                 // so the column chunk does not need additional encryption.
             }
-            Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
+            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => 
{
+                use 
crate::file::metadata::thrift_gen::serialize_column_meta_data;
+
                 let column_path = col_key.path_in_schema.join(".");
                 let mut column_encryptor = 
file_encryptor.get_column_encryptor(&column_path)?;
-                let meta_data = column_chunk
-                    .meta_data
-                    .take()
-                    .ok_or_else(|| general_err!("Column metadata not set for 
encryption"))?;
                 let aad = create_module_aad(
                     file_encryptor.file_aad(),
                     ModuleType::ColumnMetaData,
@@ -771,10 +768,15 @@ impl MetadataObjectWriter {
                     column_index,
                     None,
                 )?;
-                let ciphertext = encrypt_object_to_vec(&meta_data, &mut 
column_encryptor, &aad)?;
+                // create temp ColumnMetaData that we can encrypt
+                let mut buffer: Vec<u8> = vec![];
+                {
+                    let mut prot = ThriftCompactOutputProtocol::new(&mut 
buffer);
+                    serialize_column_meta_data(&column_chunk, &mut prot)?;
+                }
+                let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
 
                 column_chunk.encrypted_column_metadata = Some(ciphertext);
-                debug_assert!(column_chunk.meta_data.is_none());
             }
         }
 
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 1442f0f67c..b0d64ea760 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -2546,8 +2546,8 @@ mod tests {
         }
         let file_metadata = file_writer.close().unwrap();
 
-        assert_eq!(file_metadata.num_rows, 25);
-        assert_eq!(file_metadata.row_groups.len(), 5);
+        assert_eq!(file_metadata.file_metadata().num_rows(), 25);
+        assert_eq!(file_metadata.num_row_groups(), 5);
 
         // read only the 3rd row group
         let read_options = ReadOptionsBuilder::new()
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index d6f742c137..1ce7ad2912 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -155,7 +155,6 @@ pub type OnCloseRowGroup<'a, W> = Box<
 /// - After all row groups have been written, close the file writer using 
`close` method.
 pub struct SerializedFileWriter<W: Write> {
     buf: TrackedWrite<W>,
-    schema: TypePtr,
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
     row_groups: Vec<RowGroupMetaData>,
@@ -195,7 +194,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
         Self::start_file(&properties, &mut buf)?;
         Ok(Self {
             buf,
-            schema,
             descr: Arc::new(schema_descriptor),
             props: properties,
             row_groups: vec![],
@@ -298,7 +296,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
     /// Unlike [`Self::close`] this does not consume self
     ///
     /// Attempting to write after calling finish will result in an error
-    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
+    pub fn finish(&mut self) -> Result<ParquetMetaData> {
         self.assert_previous_writer_closed()?;
         let metadata = self.write_metadata()?;
         self.buf.flush()?;
@@ -306,7 +304,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
     }
 
     /// Closes and finalises file writer, returning the file metadata.
-    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
+    pub fn close(mut self) -> Result<ParquetMetaData> {
         self.finish()
     }
 
@@ -326,8 +324,9 @@ impl<W: Write + Send> SerializedFileWriter<W> {
         Ok(())
     }
 
-    /// Assembles and writes metadata at the end of the file.
-    fn write_metadata(&mut self) -> Result<crate::format::FileMetaData> {
+    /// Assembles and writes metadata at the end of the file. This will take 
ownership
+    /// of `row_groups` and the page index structures.
+    fn write_metadata(&mut self) -> Result<ParquetMetaData> {
         self.finished = true;
 
         // write out any remaining bloom filters after all row groups
@@ -341,15 +340,13 @@ impl<W: Write + Send> SerializedFileWriter<W> {
             None => Some(self.kv_metadatas.clone()),
         };
 
-        let row_groups = self
-            .row_groups
-            .iter()
-            .map(|v| v.to_thrift())
-            .collect::<Vec<_>>();
+        // take ownership of metadata
+        let row_groups = std::mem::take(&mut self.row_groups);
+        let column_indexes = std::mem::take(&mut self.column_indexes);
+        let offset_indexes = std::mem::take(&mut self.offset_indexes);
 
         let mut encoder = ThriftMetadataWriter::new(
             &mut self.buf,
-            &self.schema,
             &self.descr,
             row_groups,
             Some(self.props.created_by().to_string()),
@@ -365,8 +362,10 @@ impl<W: Write + Send> SerializedFileWriter<W> {
             encoder = encoder.with_key_value_metadata(key_value_metadata)
         }
 
-        encoder = encoder.with_column_indexes(&self.column_indexes);
-        encoder = encoder.with_offset_indexes(&self.offset_indexes);
+        encoder = encoder.with_column_indexes(column_indexes);
+        if !self.props.offset_index_disabled() {
+            encoder = encoder.with_offset_indexes(offset_indexes);
+        }
         encoder.finish()
     }
 
@@ -1629,7 +1628,7 @@ mod tests {
         file: W,
         data: Vec<Vec<i32>>,
         compression: Compression,
-    ) -> crate::format::FileMetaData
+    ) -> ParquetMetaData
     where
         W: Write + Send,
         R: ChunkReader + From<W> + 'static,
@@ -1644,7 +1643,7 @@ mod tests {
         data: Vec<Vec<D::T>>,
         value: F,
         compression: Compression,
-    ) -> crate::format::FileMetaData
+    ) -> ParquetMetaData
     where
         W: Write + Send,
         R: ChunkReader + From<W> + 'static,
@@ -1715,7 +1714,7 @@ mod tests {
 
     /// File write-read roundtrip.
     /// `data` consists of arrays of values for each row group.
-    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> 
crate::format::FileMetaData {
+    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> ParquetMetaData 
{
         test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
     }
 
@@ -1790,13 +1789,12 @@ mod tests {
     fn test_column_offset_index_file() {
         let file = tempfile::tempfile().unwrap();
         let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 
5]]);
-        file_metadata.row_groups.iter().for_each(|row_group| {
-            row_group.columns.iter().for_each(|column_chunk| {
-                assert_ne!(None, column_chunk.column_index_offset);
-                assert_ne!(None, column_chunk.column_index_length);
-
-                assert_ne!(None, column_chunk.offset_index_offset);
-                assert_ne!(None, column_chunk.offset_index_length);
+        file_metadata.row_groups().iter().for_each(|row_group| {
+            row_group.columns().iter().for_each(|column_chunk| {
+                assert!(column_chunk.column_index_offset().is_some());
+                assert!(column_chunk.column_index_length().is_some());
+                assert!(column_chunk.offset_index_offset().is_some());
+                assert!(column_chunk.offset_index_length().is_some());
             })
         });
     }
@@ -2037,15 +2035,15 @@ mod tests {
         row_group_writer.close().unwrap();
 
         let metadata = file_writer.finish().unwrap();
-        assert_eq!(metadata.row_groups.len(), 1);
-        let row_group = &metadata.row_groups[0];
-        assert_eq!(row_group.columns.len(), 2);
+        assert_eq!(metadata.num_row_groups(), 1);
+        let row_group = metadata.row_group(0);
+        assert_eq!(row_group.num_columns(), 2);
         // Column "a" has both offset and column index, as requested
-        assert!(row_group.columns[0].offset_index_offset.is_some());
-        assert!(row_group.columns[0].column_index_offset.is_some());
+        assert!(row_group.column(0).offset_index_offset().is_some());
+        assert!(row_group.column(0).column_index_offset().is_some());
         // Column "b" should only have offset index
-        assert!(row_group.columns[1].offset_index_offset.is_some());
-        assert!(row_group.columns[1].column_index_offset.is_none());
+        assert!(row_group.column(1).offset_index_offset().is_some());
+        assert!(row_group.column(1).column_index_offset().is_none());
 
         let err = file_writer.next_row_group().err().unwrap().to_string();
         assert_eq!(err, "Parquet error: SerializedFileWriter already 
finished");
@@ -2099,9 +2097,8 @@ mod tests {
         row_group_writer.close().unwrap();
         let file_metadata = writer.close().unwrap();
 
-        assert_eq!(file_metadata.row_groups.len(), 1);
-        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
-        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+        assert_eq!(file_metadata.num_row_groups(), 1);
+        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
 
         let check_def_hist = |def_hist: &[i64]| {
             assert_eq!(def_hist.len(), 2);
@@ -2109,29 +2106,26 @@ mod tests {
             assert_eq!(def_hist[1], 7);
         };
 
-        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
-        let meta_data = file_metadata.row_groups[0].columns[0]
-            .meta_data
-            .as_ref()
-            .unwrap();
-        assert!(meta_data.size_statistics.is_some());
-        let size_stats = meta_data.size_statistics.as_ref().unwrap();
+        let meta_data = file_metadata.row_group(0).column(0);
 
-        assert!(size_stats.repetition_level_histogram.is_none());
-        assert!(size_stats.definition_level_histogram.is_some());
-        assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
+        assert!(meta_data.repetition_level_histogram().is_none());
+        assert!(meta_data.definition_level_histogram().is_some());
+        assert!(meta_data.unencoded_byte_array_data_bytes().is_some());
         assert_eq!(
             unenc_size,
-            size_stats.unencoded_byte_array_data_bytes.unwrap()
+            meta_data.unencoded_byte_array_data_bytes().unwrap()
         );
-        
check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
+        
check_def_hist(meta_data.definition_level_histogram().unwrap().values());
 
         // check that the read metadata is also correct
         let options = ReadOptionsBuilder::new().with_page_index().build();
         let reader = SerializedFileReader::new_with_options(file, 
options).unwrap();
 
         let rfile_metadata = reader.metadata().file_metadata();
-        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
+        assert_eq!(
+            rfile_metadata.num_rows(),
+            file_metadata.file_metadata().num_rows()
+        );
         assert_eq!(reader.num_row_groups(), 1);
         let rowgroup = reader.get_row_group(0).unwrap();
         assert_eq!(rowgroup.num_columns(), 1);
@@ -2251,9 +2245,8 @@ mod tests {
         row_group_writer.close().unwrap();
         let file_metadata = writer.close().unwrap();
 
-        assert_eq!(file_metadata.row_groups.len(), 1);
-        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
-        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+        assert_eq!(file_metadata.num_row_groups(), 1);
+        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
 
         let check_def_hist = |def_hist: &[i64]| {
             assert_eq!(def_hist.len(), 4);
@@ -2271,25 +2264,22 @@ mod tests {
 
         // check that histograms are set properly in the write and read 
metadata
         // also check that unencoded_byte_array_data_bytes is not set
-        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
-        let meta_data = file_metadata.row_groups[0].columns[0]
-            .meta_data
-            .as_ref()
-            .unwrap();
-        assert!(meta_data.size_statistics.is_some());
-        let size_stats = meta_data.size_statistics.as_ref().unwrap();
-        assert!(size_stats.repetition_level_histogram.is_some());
-        assert!(size_stats.definition_level_histogram.is_some());
-        assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
-        
check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
-        
check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
+        let meta_data = file_metadata.row_group(0).column(0);
+        assert!(meta_data.repetition_level_histogram().is_some());
+        assert!(meta_data.definition_level_histogram().is_some());
+        assert!(meta_data.unencoded_byte_array_data_bytes().is_none());
+        
check_def_hist(meta_data.definition_level_histogram().unwrap().values());
+        
check_rep_hist(meta_data.repetition_level_histogram().unwrap().values());
 
         // check that the read metadata is also correct
         let options = ReadOptionsBuilder::new().with_page_index().build();
         let reader = SerializedFileReader::new_with_options(file, 
options).unwrap();
 
         let rfile_metadata = reader.metadata().file_metadata();
-        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
+        assert_eq!(
+            rfile_metadata.num_rows(),
+            file_metadata.file_metadata().num_rows()
+        );
         assert_eq!(reader.num_row_groups(), 1);
         let rowgroup = reader.get_row_group(0).unwrap();
         assert_eq!(rowgroup.num_columns(), 1);
diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs
index 1406295c3a..dca0f84179 100644
--- a/parquet/src/schema/types.rs
+++ b/parquet/src/schema/types.rs
@@ -1114,6 +1114,24 @@ impl SchemaDescriptor {
     }
 }
 
+// walk tree and count nodes
+pub(crate) fn num_nodes(tp: &TypePtr) -> usize {
+    let mut n_nodes = 1usize; // count root
+    for f in tp.get_fields().iter() {
+        count_nodes(f, &mut n_nodes);
+    }
+    n_nodes
+}
+
+pub(crate) fn count_nodes(tp: &TypePtr, n_nodes: &mut usize) {
+    *n_nodes += 1;
+    if let Type::GroupType { ref fields, .. } = tp.as_ref() {
+        for f in fields {
+            count_nodes(f, n_nodes);
+        }
+    }
+}
+
 // do a quick walk of the tree to get proper sizing for SchemaDescriptor arrays
 fn num_leaves(tp: &TypePtr) -> usize {
     let mut n_leaves = 0usize;
diff --git a/parquet/tests/encryption/encryption.rs 
b/parquet/tests/encryption/encryption.rs
index 96dd8654cd..0261c22c2c 100644
--- a/parquet/tests/encryption/encryption.rs
+++ b/parquet/tests/encryption/encryption.rs
@@ -982,23 +982,17 @@ pub fn 
test_retrieve_row_group_statistics_after_encrypted_write() {
     }
     let file_metadata = writer.close().unwrap();
 
-    assert_eq!(file_metadata.row_groups.len(), 1);
-    let row_group = &file_metadata.row_groups[0];
-    assert_eq!(row_group.columns.len(), 1);
-    let column = &row_group.columns[0];
-    let column_stats = column
-        .meta_data
-        .as_ref()
-        .unwrap()
-        .statistics
-        .as_ref()
-        .unwrap();
+    assert_eq!(file_metadata.num_row_groups(), 1);
+    let row_group = file_metadata.row_group(0);
+    assert_eq!(row_group.num_columns(), 1);
+    let column = row_group.column(0);
+    let column_stats = column.statistics().unwrap();
     assert_eq!(
-        column_stats.min_value.as_deref(),
+        column_stats.min_bytes_opt(),
         Some(3i32.to_le_bytes().as_slice())
     );
     assert_eq!(
-        column_stats.max_value.as_deref(),
+        column_stats.max_bytes_opt(),
         Some(19i32.to_le_bytes().as_slice())
     );
 }
diff --git a/parquet/tests/encryption/encryption_async.rs 
b/parquet/tests/encryption/encryption_async.rs
index 9c1e0c00a3..6999b1a931 100644
--- a/parquet/tests/encryption/encryption_async.rs
+++ b/parquet/tests/encryption/encryption_async.rs
@@ -34,9 +34,9 @@ use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
 use parquet::encryption::decrypt::FileDecryptionProperties;
 use parquet::encryption::encrypt::FileEncryptionProperties;
 use parquet::errors::ParquetError;
+use parquet::file::metadata::ParquetMetaData;
 use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
 use parquet::file::writer::SerializedFileWriter;
-use parquet::format::FileMetaData;
 use std::io::Write;
 use std::sync::Arc;
 use tokio::fs::File;
@@ -647,7 +647,7 @@ fn spawn_column_parallel_row_group_writer(
 async fn concatenate_parallel_row_groups<W: Write + Send>(
     mut parquet_writer: SerializedFileWriter<W>,
     mut serialize_rx: Receiver<JoinHandle<RBStreamSerializeResult>>,
-) -> Result<FileMetaData, ParquetError> {
+) -> Result<ParquetMetaData, ParquetError> {
     while let Some(task) = serialize_rx.recv().await {
         let result = task.await;
         let mut rg_out = parquet_writer.next_row_group()?;
@@ -818,8 +818,7 @@ async fn test_multi_threaded_encrypted_writing() {
     let metadata = serialized_file_writer.close().unwrap();
 
     // Close the file writer which writes the footer
-    assert_eq!(metadata.num_rows, 50);
-    assert_eq!(metadata.schema, metadata.schema);
+    assert_eq!(metadata.file_metadata().num_rows(), 50);
 
     // Check that the file was written correctly
     let (read_record_batches, read_metadata) =
@@ -909,8 +908,7 @@ async fn test_multi_threaded_encrypted_writing_deprecated() 
{
 
     // Close the file writer which writes the footer
     let metadata = writer.finish().unwrap();
-    assert_eq!(metadata.num_rows, 100);
-    assert_eq!(metadata.schema, metadata.schema);
+    assert_eq!(metadata.file_metadata().num_rows(), 100);
 
     // Check that the file was written correctly
     let (read_record_batches, read_metadata) =

Reply via email to