This is an automated email from the ASF dual-hosted git repository.

etseidl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 891d31d280 [thrift-remodel] Refactor Thrift encryption and store 
encodings as bitmask (#8587)
891d31d280 is described below

commit 891d31d280a4403a25c3158968584e9a678ba5b7
Author: Ed Seidl <[email protected]>
AuthorDate: Mon Oct 13 12:36:04 2025 -0700

    [thrift-remodel] Refactor Thrift encryption and store encodings as bitmask 
(#8587)
    
    # Which issue does this PR close?
    
    - Part of #5853.
    - Part of #8518.
    - Closes #8588.
    
    # Rationale for this change
    
    Reduce code clutter (too many `[cfg(feature = "encryption")]`s).
    
    Also reduces the number of vector allocations using a strategy proposed
    by @jhorstmann
    (https://github.com/apache/arrow-rs/issues/8518#issuecomment-3355199067).
    
    # What changes are included in this PR?
    
    Refactors (most) of the Thrift decoding involving encryption into a new
    module.
    
    Replaces the `encodings` vector in `ColumnChunkMetaData` with a bitmask.
    
    # Are these changes tested?
    
    Covered by existing and new tests.
    
    # Are there any user-facing changes?
    Yes. This changes the return type of `ColumnChunkMetaData::encodings`
    from `&Vec<Encoding>` to `Vec<Encoding>` (the vector is now created on
    demand rather than stored).
---
 parquet/src/basic.rs                    | 182 +++++++++++
 parquet/src/column/writer/mod.rs        |  19 +-
 parquet/src/file/metadata/encryption.rs | 544 ++++++++++++++++++++++++++++++++
 parquet/src/file/metadata/mod.rs        |  38 ++-
 parquet/src/file/metadata/parser.rs     |   2 +-
 parquet/src/file/metadata/thrift_gen.rs | 404 +-----------------------
 parquet/src/file/metadata/writer.rs     |   2 +-
 parquet/src/file/writer.rs              |   3 +-
 parquet/src/schema/printer.rs           |   6 +-
 9 files changed, 784 insertions(+), 416 deletions(-)

diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index dc6402e886..0220b1b55d 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -25,6 +25,7 @@ use std::str::FromStr;
 use std::{fmt, str};
 
 pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
+use crate::file::metadata::HeapSize;
 use crate::parquet_thrift::{
     ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, 
ThriftCompactOutputProtocol,
     WriteThrift, WriteThriftField,
@@ -724,6 +725,132 @@ impl FromStr for Encoding {
     }
 }
 
+/// A bitmask representing the [`Encoding`]s employed while encoding a Parquet 
column chunk.
+///
+/// The Parquet [`ColumnMetaData`] struct contains an array that indicates 
what encodings were
+/// used when writing that column chunk. For memory and performance reasons, 
this crate reduces
+/// that array to bitmask, where each bit position represents a different 
[`Encoding`]. This
+/// struct contains that bitmask, and provides methods to interact with the 
data.
+///
+/// # Example
+/// ```no_run
+/// # use parquet::file::metadata::ParquetMetaDataReader;
+/// # use parquet::basic::Encoding;
+/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
+/// // read parquet metadata from a file
+/// let file = open_parquet_file("some_path.parquet");
+/// let mut reader = ParquetMetaDataReader::new();
+/// reader.try_parse(&file).unwrap();
+/// let metadata = reader.finish().unwrap();
+///
+/// // find the encodings used by the first column chunk in the first row group
+/// let col_meta = metadata.row_group(0).column(0);
+/// let encodings = col_meta.encodings_mask();
+///
+/// // check to see if a particular encoding was used
+/// let used_rle = encodings.is_set(Encoding::RLE);
+///
+/// // check to see if all of a set of encodings were used
+/// let used_all = encodings.all_set([Encoding::RLE, Encoding::PLAIN].iter());
+///
+/// // convert mask to a Vec<Encoding>
+/// let encodings_vec = encodings.encodings().collect::<Vec<_>>();
+/// ```
+///
+/// [`ColumnMetaData`]: 
https://github.com/apache/parquet-format/blob/9fd57b59e0ce1a82a69237dcf8977d3e72a2965d/src/main/thrift/parquet.thrift#L875
+#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
+pub struct EncodingMask(i32);
+
+impl EncodingMask {
+    /// Highest valued discriminant in the [`Encoding`] enum
+    const MAX_ENCODING: i32 = Encoding::BYTE_STREAM_SPLIT as i32;
+    /// A mask consisting of unused bit positions, used for validation. This 
includes the never
+    /// used GROUP_VAR_INT encoding value of `1`.
+    const ALLOWED_MASK: u32 =
+        !(1u32 << (EncodingMask::MAX_ENCODING as u32 + 1)).wrapping_sub(1) | 1 
<< 1;
+
+    /// Attempt to create a new `EncodingMask` from an integer.
+    ///
+    /// This will return an error if a bit outside the allowable range is set.
+    pub fn try_new(val: i32) -> Result<Self> {
+        if val as u32 & Self::ALLOWED_MASK != 0 {
+            return Err(general_err!("Attempt to create invalid mask: 0x{:x}", 
val));
+        }
+        Ok(Self(val))
+    }
+
+    /// Return an integer representation of this `EncodingMask`.
+    pub fn as_i32(&self) -> i32 {
+        self.0
+    }
+
+    /// Create a new `EncodingMask` from a collection of [`Encoding`]s.
+    pub fn new_from_encodings<'a>(encodings: impl Iterator<Item = &'a 
Encoding>) -> Self {
+        let mut mask = 0;
+        for &e in encodings {
+            mask |= 1 << (e as i32);
+        }
+        Self(mask)
+    }
+
+    /// Test if a given [`Encoding`] is present in this mask.
+    pub fn is_set(&self, val: Encoding) -> bool {
+        self.0 & (1 << (val as i32)) != 0
+    }
+
+    /// Test if all [`Encoding`]s in a given set are present in this mask.
+    pub fn all_set<'a>(&self, mut encodings: impl Iterator<Item = &'a 
Encoding>) -> bool {
+        encodings.all(|&e| self.is_set(e))
+    }
+
+    /// Return an iterator over all [`Encoding`]s present in this mask.
+    pub fn encodings(&self) -> impl Iterator<Item = Encoding> {
+        Self::mask_to_encodings_iter(self.0)
+    }
+
+    fn mask_to_encodings_iter(mask: i32) -> impl Iterator<Item = Encoding> {
+        (0..=Self::MAX_ENCODING)
+            .filter(move |i| mask & (1 << i) != 0)
+            .map(i32_to_encoding)
+    }
+}
+
+impl HeapSize for EncodingMask {
+    fn heap_size(&self) -> usize {
+        0 // no heap allocations
+    }
+}
+
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for EncodingMask 
{
+    fn read_thrift(prot: &mut R) -> Result<Self> {
+        let mut mask = 0;
+
+        // This reads a Thrift `list<Encoding>` and turns it into a bitmask
+        let list_ident = prot.read_list_begin()?;
+        for _ in 0..list_ident.size {
+            let val = Encoding::read_thrift(prot)?;
+            mask |= 1 << val as i32;
+        }
+        Ok(Self(mask))
+    }
+}
+
+#[allow(deprecated)]
+fn i32_to_encoding(val: i32) -> Encoding {
+    match val {
+        0 => Encoding::PLAIN,
+        2 => Encoding::PLAIN_DICTIONARY,
+        3 => Encoding::RLE,
+        4 => Encoding::BIT_PACKED,
+        5 => Encoding::DELTA_BINARY_PACKED,
+        6 => Encoding::DELTA_LENGTH_BYTE_ARRAY,
+        7 => Encoding::DELTA_BYTE_ARRAY,
+        8 => Encoding::RLE_DICTIONARY,
+        9 => Encoding::BYTE_STREAM_SPLIT,
+        _ => panic!("Impossible encoding {val}"),
+    }
+}
+
 // ----------------------------------------------------------------------
 // Mirrors thrift enum `CompressionCodec`
 
@@ -2409,4 +2536,59 @@ mod tests {
         assert_eq!(EdgeInterpolationAlgorithm::ANDOYER.to_string(), "ANDOYER");
         assert_eq!(EdgeInterpolationAlgorithm::KARNEY.to_string(), "KARNEY");
     }
+
+    fn encodings_roundtrip(mut encodings: Vec<Encoding>) {
+        encodings.sort();
+        let mask = EncodingMask::new_from_encodings(encodings.iter());
+        assert!(mask.all_set(encodings.iter()));
+        let v = mask.encodings().collect::<Vec<_>>();
+        assert_eq!(v, encodings);
+    }
+
+    #[test]
+    fn test_encoding_roundtrip() {
+        encodings_roundtrip(
+            [
+                Encoding::RLE,
+                Encoding::PLAIN,
+                Encoding::DELTA_BINARY_PACKED,
+            ]
+            .into(),
+        );
+        encodings_roundtrip([Encoding::RLE_DICTIONARY, 
Encoding::PLAIN_DICTIONARY].into());
+        encodings_roundtrip([].into());
+        let encodings = [
+            Encoding::PLAIN,
+            Encoding::BIT_PACKED,
+            Encoding::RLE,
+            Encoding::DELTA_BINARY_PACKED,
+            Encoding::DELTA_BYTE_ARRAY,
+            Encoding::DELTA_LENGTH_BYTE_ARRAY,
+            Encoding::PLAIN_DICTIONARY,
+            Encoding::RLE_DICTIONARY,
+            Encoding::BYTE_STREAM_SPLIT,
+        ];
+        encodings_roundtrip(encodings.into());
+    }
+
+    #[test]
+    fn test_invalid_encoding_mask() {
+        // any set bits higher than the max should trigger an error
+        let res = EncodingMask::try_new(-1);
+        assert!(res.is_err());
+        let err = res.unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Parquet error: Attempt to create invalid mask: 0xffffffff"
+        );
+
+        // test that GROUP_VAR_INT is disallowed
+        let res = EncodingMask::try_new(2);
+        assert!(res.is_err());
+        let err = res.unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Parquet error: Attempt to create invalid mask: 0x2"
+        );
+    }
 }
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 298a0633a9..fdb94962b6 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -27,7 +27,7 @@ use std::collections::{BTreeSet, VecDeque};
 use std::str;
 
 use crate::basic::{
-    BoundaryOrder, Compression, ConvertedType, Encoding, LogicalType, 
PageType, Type,
+    BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, 
LogicalType, PageType, Type,
 };
 use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
 use crate::column::writer::encoder::{ColumnValueEncoder, 
ColumnValueEncoderImpl, ColumnValues};
@@ -1190,7 +1190,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
         let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
             .set_compression(self.codec)
-            .set_encodings(self.encodings.iter().cloned().collect())
+            
.set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
             .set_page_encoding_stats(self.encoding_stats.clone())
             .set_total_compressed_size(total_compressed_size)
             .set_total_uncompressed_size(total_uncompressed_size)
@@ -1734,7 +1734,10 @@ mod tests {
         assert_eq!(r.rows_written, 4);
 
         let metadata = r.metadata;
-        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, 
Encoding::RLE]);
+        assert_eq!(
+            metadata.encodings().collect::<Vec<_>>(),
+            vec![Encoding::PLAIN, Encoding::RLE]
+        );
         assert_eq!(metadata.num_values(), 4); // just values
         assert_eq!(metadata.dictionary_page_offset(), None);
     }
@@ -2095,8 +2098,8 @@ mod tests {
 
         let metadata = r.metadata;
         assert_eq!(
-            metadata.encodings(),
-            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
+            metadata.encodings().collect::<Vec<_>>(),
+            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
         );
         assert_eq!(metadata.num_values(), 4);
         assert_eq!(metadata.compressed_size(), 20);
@@ -2221,8 +2224,8 @@ mod tests {
 
         let metadata = r.metadata;
         assert_eq!(
-            metadata.encodings(),
-            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
+            metadata.encodings().collect::<Vec<_>>(),
+            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
         );
         assert_eq!(metadata.num_values(), 4);
         assert_eq!(metadata.compressed_size(), 20);
@@ -4100,7 +4103,7 @@ mod tests {
             .build();
         let meta = column_write_and_get_metadata::<T>(props, data);
         assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
-        assert_eq!(meta.encodings(), encodings);
+        assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
         assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
     }
 
diff --git a/parquet/src/file/metadata/encryption.rs 
b/parquet/src/file/metadata/encryption.rs
new file mode 100644
index 0000000000..4c97640b67
--- /dev/null
+++ b/parquet/src/file/metadata/encryption.rs
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// a collection of generated structs used to parse thrift metadata
+
+use std::io::Write;
+
+use crate::{
+    basic::{Compression, EncodingMask},
+    encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
+    errors::{ParquetError, Result},
+    file::{
+        column_crypto_metadata::ColumnCryptoMetaData,
+        metadata::{
+            HeapSize, LevelHistogram, PageEncodingStats, ParquetMetaData, 
RowGroupMetaData,
+            thrift_gen::{
+                GeospatialStatistics, SizeStatistics, Statistics, 
convert_geo_stats, convert_stats,
+                parquet_metadata_from_bytes,
+            },
+        },
+    },
+    parquet_thrift::{
+        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
+        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, 
WriteThriftField,
+        read_thrift_vec,
+    },
+    thrift_struct, thrift_union,
+};
+
+thrift_struct!(
+pub(crate) struct AesGcmV1 {
+  /// AAD prefix
+  1: optional binary aad_prefix
+
+  /// Unique file identifier part of AAD suffix
+  2: optional binary aad_file_unique
+
+  /// In files encrypted with AAD prefix without storing it,
+  /// readers must supply the prefix
+  3: optional bool supply_aad_prefix
+}
+);
+
+impl HeapSize for AesGcmV1 {
+    fn heap_size(&self) -> usize {
+        self.aad_prefix.heap_size()
+            + self.aad_file_unique.heap_size()
+            + self.supply_aad_prefix.heap_size()
+    }
+}
+
+thrift_struct!(
+pub(crate) struct AesGcmCtrV1 {
+  /// AAD prefix
+  1: optional binary aad_prefix
+
+  /// Unique file identifier part of AAD suffix
+  2: optional binary aad_file_unique
+
+  /// In files encrypted with AAD prefix without storing it,
+  /// readers must supply the prefix
+  3: optional bool supply_aad_prefix
+}
+);
+
+impl HeapSize for AesGcmCtrV1 {
+    fn heap_size(&self) -> usize {
+        self.aad_prefix.heap_size()
+            + self.aad_file_unique.heap_size()
+            + self.supply_aad_prefix.heap_size()
+    }
+}
+
+thrift_union!(
+union EncryptionAlgorithm {
+  1: (AesGcmV1) AES_GCM_V1
+  2: (AesGcmCtrV1) AES_GCM_CTR_V1
+}
+);
+
+impl HeapSize for EncryptionAlgorithm {
+    fn heap_size(&self) -> usize {
+        match self {
+            Self::AES_GCM_V1(gcm) => gcm.heap_size(),
+            Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
+        }
+    }
+}
+
+thrift_struct!(
+/// Crypto metadata for files with encrypted footer
+pub(crate) struct FileCryptoMetaData<'a> {
+  /// Encryption algorithm. This field is only used for files
+  /// with encrypted footer. Files with plaintext footer store algorithm id
+  /// inside footer (FileMetaData structure).
+  1: required EncryptionAlgorithm encryption_algorithm
+
+  /// Retrieval metadata of key used for encryption of footer,
+  /// and (possibly) columns.
+  2: optional binary<'a> key_metadata
+}
+);
+
+fn row_group_from_encrypted_thrift(
+    mut rg: RowGroupMetaData,
+    decryptor: Option<&FileDecryptor>,
+) -> Result<RowGroupMetaData> {
+    let schema_descr = rg.schema_descr;
+
+    if schema_descr.num_columns() != rg.columns.len() {
+        return Err(general_err!(
+            "Column count mismatch. Schema has {} columns while Row Group has 
{}",
+            schema_descr.num_columns(),
+            rg.columns.len()
+        ));
+    }
+    let total_byte_size = rg.total_byte_size;
+    let num_rows = rg.num_rows;
+    let mut columns = vec![];
+
+    for (i, (mut c, d)) in rg
+        .columns
+        .drain(0..)
+        .zip(schema_descr.columns())
+        .enumerate()
+    {
+        // Read encrypted metadata if it's present and we have a decryptor.
+        if let (true, Some(decryptor)) = 
(c.encrypted_column_metadata.is_some(), decryptor) {
+            let column_decryptor = match c.crypto_metadata() {
+                None => {
+                    return Err(general_err!(
+                        "No crypto_metadata is set for column '{}', which has 
encrypted metadata",
+                        d.path().string()
+                    ));
+                }
+                
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
+                    let column_name = crypto_metadata.path_in_schema.join(".");
+                    decryptor.get_column_metadata_decryptor(
+                        column_name.as_str(),
+                        crypto_metadata.key_metadata.as_deref(),
+                    )?
+                }
+                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
+                    decryptor.get_footer_decryptor()?
+                }
+            };
+
+            let column_aad = crate::encryption::modules::create_module_aad(
+                decryptor.file_aad(),
+                crate::encryption::modules::ModuleType::ColumnMetaData,
+                rg.ordinal.unwrap() as usize,
+                i,
+                None,
+            )?;
+
+            // Take the encrypted column metadata as it is no longer needed.
+            let encrypted_column_metadata = c.encrypted_column_metadata.take();
+            let buf = encrypted_column_metadata.unwrap();
+            let decrypted_cc_buf = column_decryptor
+                .decrypt(&buf, column_aad.as_ref())
+                .map_err(|_| {
+                    general_err!(
+                        "Unable to decrypt column '{}', perhaps the column key 
is wrong?",
+                        d.path().string()
+                    )
+                })?;
+
+            // parse decrypted buffer and then replace fields in 'c'
+            let col_meta = read_column_metadata(decrypted_cc_buf.as_slice())?;
+
+            let (
+                unencoded_byte_array_data_bytes,
+                repetition_level_histogram,
+                definition_level_histogram,
+            ) = if let Some(size_stats) = col_meta.size_statistics {
+                (
+                    size_stats.unencoded_byte_array_data_bytes,
+                    size_stats.repetition_level_histogram,
+                    size_stats.definition_level_histogram,
+                )
+            } else {
+                (None, None, None)
+            };
+
+            let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
+            let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
+
+            c.encodings = col_meta.encodings;
+            c.compression = col_meta.codec;
+            c.num_values = col_meta.num_values;
+            c.total_uncompressed_size = col_meta.total_uncompressed_size;
+            c.total_compressed_size = col_meta.total_compressed_size;
+            c.data_page_offset = col_meta.data_page_offset;
+            c.index_page_offset = col_meta.index_page_offset;
+            c.dictionary_page_offset = col_meta.dictionary_page_offset;
+            c.statistics = convert_stats(d.physical_type(), 
col_meta.statistics)?;
+            c.encoding_stats = col_meta.encoding_stats;
+            c.bloom_filter_offset = col_meta.bloom_filter_offset;
+            c.bloom_filter_length = col_meta.bloom_filter_length;
+            c.unencoded_byte_array_data_bytes = 
unencoded_byte_array_data_bytes;
+            c.repetition_level_histogram = repetition_level_histogram;
+            c.definition_level_histogram = definition_level_histogram;
+            c.geo_statistics = 
convert_geo_stats(col_meta.geospatial_statistics);
+
+            columns.push(c);
+        } else {
+            columns.push(c);
+        }
+    }
+
+    let sorting_columns = rg.sorting_columns;
+    let file_offset = rg.file_offset;
+    let ordinal = rg.ordinal;
+
+    Ok(RowGroupMetaData {
+        columns,
+        num_rows,
+        sorting_columns,
+        total_byte_size,
+        schema_descr,
+        file_offset,
+        ordinal,
+    })
+}
+
+/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata 
that may be encrypted.
+///
+/// Typically this is used to decode the metadata from the end of a parquet
+/// file. The format of `buf` is the Thrift compact binary protocol, as 
specified
+/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
+/// ciphers as specfied in the [Parquet Encryption Spec].
+///
+/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
+/// [Parquet Encryption Spec]: 
https://parquet.apache.org/docs/file-format/data-pages/encryption/
+pub(crate) fn parquet_metadata_with_encryption(
+    file_decryption_properties: Option<&FileDecryptionProperties>,
+    encrypted_footer: bool,
+    buf: &[u8],
+) -> Result<ParquetMetaData> {
+    use crate::file::metadata::ParquetMetaDataBuilder;
+
+    let mut buf = buf;
+    let mut file_decryptor = None;
+    let decrypted_fmd_buf;
+
+    if encrypted_footer {
+        let mut prot = ThriftSliceInputProtocol::new(buf);
+        if let Some(file_decryption_properties) = file_decryption_properties {
+            let t_file_crypto_metadata: FileCryptoMetaData =
+                FileCryptoMetaData::read_thrift(&mut prot)
+                    .map_err(|e| general_err!("Could not parse crypto 
metadata: {}", e))?;
+            let supply_aad_prefix = match 
&t_file_crypto_metadata.encryption_algorithm {
+                EncryptionAlgorithm::AES_GCM_V1(algo) => 
algo.supply_aad_prefix,
+                _ => Some(false),
+            }
+            .unwrap_or(false);
+            if supply_aad_prefix && 
file_decryption_properties.aad_prefix().is_none() {
+                return Err(general_err!(
+                    "Parquet file was encrypted with an AAD prefix that is not 
stored in the file, \
+                        but no AAD prefix was provided in the file decryption 
properties"
+                ));
+            }
+            let decryptor = get_file_decryptor(
+                t_file_crypto_metadata.encryption_algorithm,
+                t_file_crypto_metadata.key_metadata,
+                file_decryption_properties,
+            )?;
+            let footer_decryptor = decryptor.get_footer_decryptor();
+            let aad_footer = 
crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
+
+            decrypted_fmd_buf = footer_decryptor?
+                .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
+                .map_err(|_| {
+                    general_err!(
+                        "Provided footer key and AAD were unable to decrypt 
parquet footer"
+                    )
+                })?;
+
+            buf = &decrypted_fmd_buf;
+            file_decryptor = Some(decryptor);
+        } else {
+            return Err(general_err!(
+                "Parquet file has an encrypted footer but decryption 
properties were not provided"
+            ));
+        }
+    }
+
+    let parquet_meta = parquet_metadata_from_bytes(buf)
+        .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
+
+    let ParquetMetaData {
+        mut file_metadata,
+        row_groups,
+        column_index: _,
+        offset_index: _,
+        file_decryptor: _,
+    } = parquet_meta;
+
+    // Take the encryption algorithm and footer signing key metadata as they 
are no longer
+    // needed after this.
+    if let (Some(algo), Some(file_decryption_properties)) = (
+        file_metadata.encryption_algorithm.take(),
+        file_decryption_properties,
+    ) {
+        let footer_signing_key_metadata = 
file_metadata.footer_signing_key_metadata.take();
+
+        // File has a plaintext footer but encryption algorithm is set
+        let file_decryptor_value = get_file_decryptor(
+            *algo,
+            footer_signing_key_metadata.as_deref(),
+            file_decryption_properties,
+        )?;
+        if file_decryption_properties.check_plaintext_footer_integrity() && 
!encrypted_footer {
+            file_decryptor_value.verify_plaintext_footer_signature(buf)?;
+        }
+        file_decryptor = Some(file_decryptor_value);
+    }
+
+    // decrypt column chunk info
+    let row_groups = row_groups
+        .into_iter()
+        .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref()))
+        .collect::<Result<Vec<_>>>()?;
+
+    let metadata = ParquetMetaDataBuilder::new(file_metadata)
+        .set_row_groups(row_groups)
+        .set_file_decryptor(file_decryptor)
+        .build();
+
+    Ok(metadata)
+}
+
+fn get_file_decryptor(
+    encryption_algorithm: EncryptionAlgorithm,
+    footer_key_metadata: Option<&[u8]>,
+    file_decryption_properties: &FileDecryptionProperties,
+) -> Result<FileDecryptor> {
+    match encryption_algorithm {
+        EncryptionAlgorithm::AES_GCM_V1(algo) => {
+            let aad_file_unique = algo
+                .aad_file_unique
+                .ok_or_else(|| general_err!("AAD unique file identifier is not 
set"))?;
+            let aad_prefix = if let Some(aad_prefix) = 
file_decryption_properties.aad_prefix() {
+                aad_prefix.clone()
+            } else {
+                algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
+            };
+            let aad_file_unique = aad_file_unique.to_vec();
+
+            FileDecryptor::new(
+                file_decryption_properties,
+                footer_key_metadata,
+                aad_file_unique,
+                aad_prefix,
+            )
+        }
+        EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
+            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
+        )),
+    }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+struct ColumnMetaData<'a> {
+    encodings: EncodingMask,
+    codec: Compression,
+    num_values: i64,
+    total_uncompressed_size: i64,
+    total_compressed_size: i64,
+    data_page_offset: i64,
+    index_page_offset: Option<i64>,
+    dictionary_page_offset: Option<i64>,
+    statistics: Option<Statistics<'a>>,
+    encoding_stats: Option<Vec<PageEncodingStats>>,
+    bloom_filter_offset: Option<i64>,
+    bloom_filter_length: Option<i32>,
+    size_statistics: Option<SizeStatistics>,
+    geospatial_statistics: Option<GeospatialStatistics>,
+}
+
+fn read_column_metadata<'a>(buf: &'a [u8]) -> Result<ColumnMetaData<'a>> {
+    let mut prot = ThriftSliceInputProtocol::new(buf);
+
+    let mut encodings: Option<EncodingMask> = None;
+    let mut codec: Option<Compression> = None;
+    let mut num_values: Option<i64> = None;
+    let mut total_uncompressed_size: Option<i64> = None;
+    let mut total_compressed_size: Option<i64> = None;
+    let mut data_page_offset: Option<i64> = None;
+    let mut index_page_offset: Option<i64> = None;
+    let mut dictionary_page_offset: Option<i64> = None;
+    let mut statistics: Option<Statistics> = None;
+    let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
+    let mut bloom_filter_offset: Option<i64> = None;
+    let mut bloom_filter_length: Option<i32> = None;
+    let mut size_statistics: Option<SizeStatistics> = None;
+    let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+
+    // `ColumnMetaData`. Read inline for performance sake.
+    // struct ColumnMetaData {
+    //   1: required Type type
+    //   2: required list<Encoding> encodings
+    //   3: required list<string> path_in_schema
+    //   4: required CompressionCodec codec
+    //   5: required i64 num_values
+    //   6: required i64 total_uncompressed_size
+    //   7: required i64 total_compressed_size
+    //   8: optional list<KeyValue> key_value_metadata
+    //   9: required i64 data_page_offset
+    //   10: optional i64 index_page_offset
+    //   11: optional i64 dictionary_page_offset
+    //   12: optional Statistics statistics;
+    //   13: optional list<PageEncodingStats> encoding_stats;
+    //   14: optional i64 bloom_filter_offset;
+    //   15: optional i32 bloom_filter_length;
+    //   16: optional SizeStatistics size_statistics;
+    //   17: optional GeospatialStatistics geospatial_statistics;
+    // }
+    let mut last_field_id = 0i16;
+    loop {
+        let field_ident = prot.read_field_begin(last_field_id)?;
+        if field_ident.field_type == FieldType::Stop {
+            break;
+        }
+        match field_ident.id {
+            // 1: type is never used, we can use the column descriptor
+            2 => {
+                let val = EncodingMask::read_thrift(&mut prot)?;
+                encodings = Some(val);
+            }
+            // 3: path_in_schema is redundant
+            4 => {
+                codec = Some(Compression::read_thrift(&mut prot)?);
+            }
+            5 => {
+                num_values = Some(i64::read_thrift(&mut prot)?);
+            }
+            6 => {
+                total_uncompressed_size = Some(i64::read_thrift(&mut prot)?);
+            }
+            7 => {
+                total_compressed_size = Some(i64::read_thrift(&mut prot)?);
+            }
+            // 8: we don't expose this key value
+            9 => {
+                data_page_offset = Some(i64::read_thrift(&mut prot)?);
+            }
+            10 => {
+                index_page_offset = Some(i64::read_thrift(&mut prot)?);
+            }
+            11 => {
+                dictionary_page_offset = Some(i64::read_thrift(&mut prot)?);
+            }
+            12 => {
+                statistics = Some(Statistics::read_thrift(&mut prot)?);
+            }
+            13 => {
+                let val =
+                    read_thrift_vec::<PageEncodingStats, 
ThriftSliceInputProtocol>(&mut prot)?;
+                encoding_stats = Some(val);
+            }
+            14 => {
+                bloom_filter_offset = Some(i64::read_thrift(&mut prot)?);
+            }
+            15 => {
+                bloom_filter_length = Some(i32::read_thrift(&mut prot)?);
+            }
+            16 => {
+                let val = SizeStatistics::read_thrift(&mut prot)?;
+                size_statistics = Some(val);
+            }
+            17 => {
+                let val = GeospatialStatistics::read_thrift(&mut prot)?;
+                geospatial_statistics = Some(val);
+            }
+            _ => {
+                prot.skip(field_ident.field_type)?;
+            }
+        };
+        last_field_id = field_ident.id;
+    }
+
+    let Some(encodings) = encodings else {
+        return Err(ParquetError::General(
+            "Required field encodings is missing".to_owned(),
+        ));
+    };
+    let Some(codec) = codec else {
+        return Err(ParquetError::General(
+            "Required field codec is missing".to_owned(),
+        ));
+    };
+    let Some(num_values) = num_values else {
+        return Err(ParquetError::General(
+            "Required field num_values is missing".to_owned(),
+        ));
+    };
+    let Some(total_uncompressed_size) = total_uncompressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_uncompressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(total_compressed_size) = total_compressed_size else {
+        return Err(ParquetError::General(
+            "Required field total_compressed_size is missing".to_owned(),
+        ));
+    };
+    let Some(data_page_offset) = data_page_offset else {
+        return Err(ParquetError::General(
+            "Required field data_page_offset is missing".to_owned(),
+        ));
+    };
+
+    Ok(ColumnMetaData {
+        encodings,
+        num_values,
+        codec,
+        total_uncompressed_size,
+        total_compressed_size,
+        data_page_offset,
+        index_page_offset,
+        dictionary_page_offset,
+        statistics,
+        encoding_stats,
+        bloom_filter_offset,
+        bloom_filter_length,
+        size_statistics,
+        geospatial_statistics,
+    })
+}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 87585f2d71..6c6e980b86 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -87,6 +87,8 @@
 //!
 //!                         * Same name, different struct
 //! ```
+#[cfg(feature = "encryption")]
+mod encryption;
 mod footer_tail;
 mod memory;
 mod parser;
@@ -95,14 +97,14 @@ pub(crate) mod reader;
 pub(crate) mod thrift_gen;
 mod writer;
 
-use crate::basic::PageType;
+use crate::basic::{EncodingMask, PageType};
 #[cfg(feature = "encryption")]
 use crate::encryption::decrypt::FileDecryptor;
 #[cfg(feature = "encryption")]
 use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
-pub(crate) use crate::file::metadata::memory::HeapSize;
 #[cfg(feature = "encryption")]
-use crate::file::metadata::thrift_gen::EncryptionAlgorithm;
+use crate::file::metadata::encryption::EncryptionAlgorithm;
+pub(crate) use crate::file::metadata::memory::HeapSize;
 use crate::file::page_index::column_index::{ByteArrayColumnIndex, 
PrimitiveColumnIndex};
 use crate::file::page_index::{column_index::ColumnIndexMetaData, 
offset_index::PageLocation};
 use crate::file::statistics::Statistics;
@@ -788,7 +790,7 @@ impl RowGroupMetaDataBuilder {
 #[derive(Debug, Clone, PartialEq)]
 pub struct ColumnChunkMetaData {
     column_descr: ColumnDescPtr,
-    encodings: Vec<Encoding>,
+    encodings: EncodingMask,
     file_path: Option<String>,
     file_offset: i64,
     num_values: i64,
@@ -968,7 +970,12 @@ impl ColumnChunkMetaData {
     }
 
     /// All encodings used for this column.
-    pub fn encodings(&self) -> &Vec<Encoding> {
+    pub fn encodings(&self) -> impl Iterator<Item = Encoding> {
+        self.encodings.encodings()
+    }
+
+    /// All encodings used for this column, returned as a bitmask.
+    pub fn encodings_mask(&self) -> &EncodingMask {
         &self.encodings
     }
 
@@ -1148,7 +1155,7 @@ impl ColumnChunkMetaDataBuilder {
     fn new(column_descr: ColumnDescPtr) -> Self {
         Self(ColumnChunkMetaData {
             column_descr,
-            encodings: Vec::new(),
+            encodings: Default::default(),
             file_path: None,
             file_offset: 0,
             num_values: 0,
@@ -1179,6 +1186,12 @@ impl ColumnChunkMetaDataBuilder {
 
     /// Sets list of encodings for this column chunk.
     pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
+        self.0.encodings = EncodingMask::new_from_encodings(encodings.iter());
+        self
+    }
+
+    /// Sets the encodings mask for this column chunk.
+    pub fn set_encodings_mask(mut self, encodings: EncodingMask) -> Self {
         self.0.encodings = encodings;
         self
     }
@@ -1704,9 +1717,10 @@ mod tests {
     #[test]
     fn test_column_chunk_metadata_thrift_conversion() {
         let column_descr = get_test_schema_descr().column(0);
-
         let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
-            .set_encodings(vec![Encoding::PLAIN, Encoding::RLE])
+            .set_encodings_mask(EncodingMask::new_from_encodings(
+                [Encoding::PLAIN, Encoding::RLE].iter(),
+            ))
             .set_file_path("file_path".to_owned())
             .set_num_values(1000)
             .set_compression(Compression::SNAPPY)
@@ -1858,9 +1872,9 @@ mod tests {
             .build();
 
         #[cfg(not(feature = "encryption"))]
-        let base_expected_size = 2312;
+        let base_expected_size = 2248;
         #[cfg(feature = "encryption")]
-        let base_expected_size = 2480;
+        let base_expected_size = 2416;
 
         assert_eq!(parquet_meta.memory_size(), base_expected_size);
 
@@ -1889,9 +1903,9 @@ mod tests {
             .build();
 
         #[cfg(not(feature = "encryption"))]
-        let bigger_expected_size = 2738;
+        let bigger_expected_size = 2674;
         #[cfg(feature = "encryption")]
-        let bigger_expected_size = 2906;
+        let bigger_expected_size = 2842;
 
         // more set fields means more memory usage
         assert!(bigger_expected_size > base_expected_size);
diff --git a/parquet/src/file/metadata/parser.rs 
b/parquet/src/file/metadata/parser.rs
index 223ea3484c..1e30093909 100644
--- a/parquet/src/file/metadata/parser.rs
+++ b/parquet/src/file/metadata/parser.rs
@@ -72,7 +72,7 @@ mod inner {
             encrypted_footer: bool,
         ) -> Result<ParquetMetaData> {
             if encrypted_footer || self.file_decryption_properties.is_some() {
-                
crate::file::metadata::thrift_gen::parquet_metadata_with_encryption(
+                
crate::file::metadata::encryption::parquet_metadata_with_encryption(
                     self.file_decryption_properties.as_deref(),
                     encrypted_footer,
                     buf,
diff --git a/parquet/src/file/metadata/thrift_gen.rs 
b/parquet/src/file/metadata/thrift_gen.rs
index a107acfad8..36fbe42a90 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift_gen.rs
@@ -20,9 +20,14 @@
 use std::io::Write;
 use std::sync::Arc;
 
+#[cfg(feature = "encryption")]
+use crate::file::{
+    column_crypto_metadata::ColumnCryptoMetaData, 
metadata::encryption::EncryptionAlgorithm,
+};
 use crate::{
     basic::{
-        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, 
PageType, Repetition, Type,
+        ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, 
LogicalType, PageType,
+        Repetition, Type,
     },
     data_type::{ByteArray, FixedLenByteArray, Int96},
     errors::{ParquetError, Result},
@@ -44,12 +49,6 @@ use crate::{
     thrift_struct,
     util::bit_util::FromBytes,
 };
-#[cfg(feature = "encryption")]
-use crate::{
-    encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
-    file::{column_crypto_metadata::ColumnCryptoMetaData, metadata::HeapSize},
-    thrift_union,
-};
 
 // this needs to be visible to the schema conversion code
 thrift_struct!(
@@ -106,118 +105,8 @@ pub(crate) struct Statistics<'a> {
 }
 );
 
-// TODO(ets): move a lot of the encryption stuff to its own module
-#[cfg(feature = "encryption")]
-thrift_struct!(
-pub(crate) struct AesGcmV1 {
-  /// AAD prefix
-  1: optional binary aad_prefix
-
-  /// Unique file identifier part of AAD suffix
-  2: optional binary aad_file_unique
-
-  /// In files encrypted with AAD prefix without storing it,
-  /// readers must supply the prefix
-  3: optional bool supply_aad_prefix
-}
-);
-
-#[cfg(feature = "encryption")]
-impl HeapSize for AesGcmV1 {
-    fn heap_size(&self) -> usize {
-        self.aad_prefix.heap_size()
-            + self.aad_file_unique.heap_size()
-            + self.supply_aad_prefix.heap_size()
-    }
-}
-
-#[cfg(feature = "encryption")]
 thrift_struct!(
-pub(crate) struct AesGcmCtrV1 {
-  /// AAD prefix
-  1: optional binary aad_prefix
-
-  /// Unique file identifier part of AAD suffix
-  2: optional binary aad_file_unique
-
-  /// In files encrypted with AAD prefix without storing it,
-  /// readers must supply the prefix
-  3: optional bool supply_aad_prefix
-}
-);
-
-#[cfg(feature = "encryption")]
-impl HeapSize for AesGcmCtrV1 {
-    fn heap_size(&self) -> usize {
-        self.aad_prefix.heap_size()
-            + self.aad_file_unique.heap_size()
-            + self.supply_aad_prefix.heap_size()
-    }
-}
-
-#[cfg(feature = "encryption")]
-thrift_union!(
-union EncryptionAlgorithm {
-  1: (AesGcmV1) AES_GCM_V1
-  2: (AesGcmCtrV1) AES_GCM_CTR_V1
-}
-);
-
-#[cfg(feature = "encryption")]
-impl HeapSize for EncryptionAlgorithm {
-    fn heap_size(&self) -> usize {
-        match self {
-            Self::AES_GCM_V1(gcm) => gcm.heap_size(),
-            Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
-        }
-    }
-}
-
-#[cfg(feature = "encryption")]
-thrift_struct!(
-/// Crypto metadata for files with encrypted footer
-pub(crate) struct FileCryptoMetaData<'a> {
-  /// Encryption algorithm. This field is only used for files
-  /// with encrypted footer. Files with plaintext footer store algorithm id
-  /// inside footer (FileMetaData structure).
-  1: required EncryptionAlgorithm encryption_algorithm
-
-  /// Retrieval metadata of key used for encryption of footer,
-  /// and (possibly) columns.
-  2: optional binary<'a> key_metadata
-}
-);
-
-// the following are only used internally so are private
-#[cfg(feature = "encryption")]
-type CompressionCodec = Compression;
-#[cfg(feature = "encryption")]
-thrift_struct!(
-struct ColumnMetaData<'a> {
-  1: required Type r#type
-  2: required list<Encoding> encodings
-  // we don't expose path_in_schema so skip
-  //3: required list<string> path_in_schema
-  4: required CompressionCodec codec
-  5: required i64 num_values
-  6: required i64 total_uncompressed_size
-  7: required i64 total_compressed_size
-  // we don't expose key_value_metadata so skip
-  //8: optional list<KeyValue> key_value_metadata
-  9: required i64 data_page_offset
-  10: optional i64 index_page_offset
-  11: optional i64 dictionary_page_offset
-  12: optional Statistics<'a> statistics
-  13: optional list<PageEncodingStats> encoding_stats;
-  14: optional i64 bloom_filter_offset;
-  15: optional i32 bloom_filter_length;
-  16: optional SizeStatistics size_statistics;
-  17: optional GeospatialStatistics geospatial_statistics;
-}
-);
-
-thrift_struct!(
-struct BoundingBox {
+pub(super) struct BoundingBox {
   1: required double xmin;
   2: required double xmax;
   3: required double ymin;
@@ -230,21 +119,21 @@ struct BoundingBox {
 );
 
 thrift_struct!(
-struct GeospatialStatistics {
+pub(super) struct GeospatialStatistics {
   1: optional BoundingBox bbox;
   2: optional list<i32> geospatial_types;
 }
 );
 
 thrift_struct!(
-struct SizeStatistics {
+pub(super) struct SizeStatistics {
    1: optional i64 unencoded_byte_array_data_bytes;
    2: optional list<i64> repetition_level_histogram;
    3: optional list<i64> definition_level_histogram;
 }
 );
 
-fn convert_geo_stats(
+pub(super) fn convert_geo_stats(
     stats: Option<GeospatialStatistics>,
 ) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
     stats.map(|st| {
@@ -434,269 +323,6 @@ pub(crate) fn convert_stats(
     })
 }
 
-#[cfg(feature = "encryption")]
-fn row_group_from_encrypted_thrift(
-    mut rg: RowGroupMetaData,
-    decryptor: Option<&FileDecryptor>,
-) -> Result<RowGroupMetaData> {
-    let schema_descr = rg.schema_descr;
-
-    if schema_descr.num_columns() != rg.columns.len() {
-        return Err(general_err!(
-            "Column count mismatch. Schema has {} columns while Row Group has 
{}",
-            schema_descr.num_columns(),
-            rg.columns.len()
-        ));
-    }
-    let total_byte_size = rg.total_byte_size;
-    let num_rows = rg.num_rows;
-    let mut columns = vec![];
-
-    for (i, (mut c, d)) in rg
-        .columns
-        .drain(0..)
-        .zip(schema_descr.columns())
-        .enumerate()
-    {
-        // Read encrypted metadata if it's present and we have a decryptor.
-        if let (true, Some(decryptor)) = 
(c.encrypted_column_metadata.is_some(), decryptor) {
-            let column_decryptor = match c.crypto_metadata() {
-                None => {
-                    return Err(general_err!(
-                        "No crypto_metadata is set for column '{}', which has 
encrypted metadata",
-                        d.path().string()
-                    ));
-                }
-                
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
-                    let column_name = crypto_metadata.path_in_schema.join(".");
-                    decryptor.get_column_metadata_decryptor(
-                        column_name.as_str(),
-                        crypto_metadata.key_metadata.as_deref(),
-                    )?
-                }
-                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
-                    decryptor.get_footer_decryptor()?
-                }
-            };
-
-            let column_aad = crate::encryption::modules::create_module_aad(
-                decryptor.file_aad(),
-                crate::encryption::modules::ModuleType::ColumnMetaData,
-                rg.ordinal.unwrap() as usize,
-                i,
-                None,
-            )?;
-
-            // Take the encrypted column metadata as it is no longer needed.
-            let encrypted_column_metadata = c.encrypted_column_metadata.take();
-            let buf = encrypted_column_metadata.unwrap();
-            let decrypted_cc_buf = column_decryptor
-                .decrypt(&buf, column_aad.as_ref())
-                .map_err(|_| {
-                    general_err!(
-                        "Unable to decrypt column '{}', perhaps the column key 
is wrong?",
-                        d.path().string()
-                    )
-                })?;
-
-            // parse decrypted buffer and then replace fields in 'c'
-            let mut prot = 
ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice());
-            let col_meta = ColumnMetaData::read_thrift(&mut prot)?;
-
-            let (
-                unencoded_byte_array_data_bytes,
-                repetition_level_histogram,
-                definition_level_histogram,
-            ) = if let Some(size_stats) = col_meta.size_statistics {
-                (
-                    size_stats.unencoded_byte_array_data_bytes,
-                    size_stats.repetition_level_histogram,
-                    size_stats.definition_level_histogram,
-                )
-            } else {
-                (None, None, None)
-            };
-
-            let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
-            let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
-
-            c.encodings = col_meta.encodings;
-            c.compression = col_meta.codec;
-            c.num_values = col_meta.num_values;
-            c.total_uncompressed_size = col_meta.total_uncompressed_size;
-            c.total_compressed_size = col_meta.total_compressed_size;
-            c.data_page_offset = col_meta.data_page_offset;
-            c.index_page_offset = col_meta.index_page_offset;
-            c.dictionary_page_offset = col_meta.dictionary_page_offset;
-            c.statistics = convert_stats(col_meta.r#type, 
col_meta.statistics)?;
-            c.encoding_stats = col_meta.encoding_stats;
-            c.bloom_filter_offset = col_meta.bloom_filter_offset;
-            c.bloom_filter_length = col_meta.bloom_filter_length;
-            c.unencoded_byte_array_data_bytes = 
unencoded_byte_array_data_bytes;
-            c.repetition_level_histogram = repetition_level_histogram;
-            c.definition_level_histogram = definition_level_histogram;
-            c.geo_statistics = 
convert_geo_stats(col_meta.geospatial_statistics);
-
-            columns.push(c);
-        } else {
-            columns.push(c);
-        }
-    }
-
-    let sorting_columns = rg.sorting_columns;
-    let file_offset = rg.file_offset;
-    let ordinal = rg.ordinal;
-
-    Ok(RowGroupMetaData {
-        columns,
-        num_rows,
-        sorting_columns,
-        total_byte_size,
-        schema_descr,
-        file_offset,
-        ordinal,
-    })
-}
-
-#[cfg(feature = "encryption")]
-/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata 
that may be encrypted.
-///
-/// Typically this is used to decode the metadata from the end of a parquet
-/// file. The format of `buf` is the Thrift compact binary protocol, as 
specified
-/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
-/// ciphers as specfied in the [Parquet Encryption Spec].
-///
-/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
-/// [Parquet Encryption Spec]: 
https://parquet.apache.org/docs/file-format/data-pages/encryption/
-pub(crate) fn parquet_metadata_with_encryption(
-    file_decryption_properties: Option<&FileDecryptionProperties>,
-    encrypted_footer: bool,
-    buf: &[u8],
-) -> Result<ParquetMetaData> {
-    use crate::file::metadata::ParquetMetaDataBuilder;
-
-    let mut buf = buf;
-    let mut file_decryptor = None;
-    let decrypted_fmd_buf;
-
-    if encrypted_footer {
-        let mut prot = ThriftSliceInputProtocol::new(buf);
-        if let Some(file_decryption_properties) = file_decryption_properties {
-            let t_file_crypto_metadata: FileCryptoMetaData =
-                FileCryptoMetaData::read_thrift(&mut prot)
-                    .map_err(|e| general_err!("Could not parse crypto 
metadata: {}", e))?;
-            let supply_aad_prefix = match 
&t_file_crypto_metadata.encryption_algorithm {
-                EncryptionAlgorithm::AES_GCM_V1(algo) => 
algo.supply_aad_prefix,
-                _ => Some(false),
-            }
-            .unwrap_or(false);
-            if supply_aad_prefix && 
file_decryption_properties.aad_prefix().is_none() {
-                return Err(general_err!(
-                    "Parquet file was encrypted with an AAD prefix that is not 
stored in the file, \
-                        but no AAD prefix was provided in the file decryption 
properties"
-                ));
-            }
-            let decryptor = get_file_decryptor(
-                t_file_crypto_metadata.encryption_algorithm,
-                t_file_crypto_metadata.key_metadata,
-                file_decryption_properties,
-            )?;
-            let footer_decryptor = decryptor.get_footer_decryptor();
-            let aad_footer = 
crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
-
-            decrypted_fmd_buf = footer_decryptor?
-                .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
-                .map_err(|_| {
-                    general_err!(
-                        "Provided footer key and AAD were unable to decrypt 
parquet footer"
-                    )
-                })?;
-
-            buf = &decrypted_fmd_buf;
-            file_decryptor = Some(decryptor);
-        } else {
-            return Err(general_err!(
-                "Parquet file has an encrypted footer but decryption 
properties were not provided"
-            ));
-        }
-    }
-
-    let parquet_meta = parquet_metadata_from_bytes(buf)
-        .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
-
-    let ParquetMetaData {
-        mut file_metadata,
-        row_groups,
-        column_index: _,
-        offset_index: _,
-        file_decryptor: _,
-    } = parquet_meta;
-
-    // Take the encryption algorithm and footer signing key metadata as they 
are no longer
-    // needed after this.
-    if let (Some(algo), Some(file_decryption_properties)) = (
-        file_metadata.encryption_algorithm.take(),
-        file_decryption_properties,
-    ) {
-        let footer_signing_key_metadata = 
file_metadata.footer_signing_key_metadata.take();
-
-        // File has a plaintext footer but encryption algorithm is set
-        let file_decryptor_value = get_file_decryptor(
-            *algo,
-            footer_signing_key_metadata.as_deref(),
-            file_decryption_properties,
-        )?;
-        if file_decryption_properties.check_plaintext_footer_integrity() && 
!encrypted_footer {
-            file_decryptor_value.verify_plaintext_footer_signature(buf)?;
-        }
-        file_decryptor = Some(file_decryptor_value);
-    }
-
-    // decrypt column chunk info
-    let row_groups = row_groups
-        .into_iter()
-        .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref()))
-        .collect::<Result<Vec<_>>>()?;
-
-    let metadata = ParquetMetaDataBuilder::new(file_metadata)
-        .set_row_groups(row_groups)
-        .set_file_decryptor(file_decryptor)
-        .build();
-
-    Ok(metadata)
-}
-
-#[cfg(feature = "encryption")]
-fn get_file_decryptor(
-    encryption_algorithm: EncryptionAlgorithm,
-    footer_key_metadata: Option<&[u8]>,
-    file_decryption_properties: &FileDecryptionProperties,
-) -> Result<FileDecryptor> {
-    match encryption_algorithm {
-        EncryptionAlgorithm::AES_GCM_V1(algo) => {
-            let aad_file_unique = algo
-                .aad_file_unique
-                .ok_or_else(|| general_err!("AAD unique file identifier is not 
set"))?;
-            let aad_prefix = if let Some(aad_prefix) = 
file_decryption_properties.aad_prefix() {
-                aad_prefix.clone()
-            } else {
-                algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
-            };
-            let aad_file_unique = aad_file_unique.to_vec();
-
-            FileDecryptor::new(
-                file_decryption_properties,
-                footer_key_metadata,
-                aad_file_unique,
-                aad_prefix,
-            )
-        }
-        EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
-            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
-        )),
-    }
-}
-
 // using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait 
because
 // these are all internal and operate on slices.
 fn read_column_chunk<'a>(
@@ -716,7 +342,7 @@ fn read_column_chunk<'a>(
     let mut encrypted_column_metadata: Option<&[u8]> = None;
 
     // ColumnMetaData
-    let mut encodings: Option<Vec<Encoding>> = None;
+    let mut encodings: Option<EncodingMask> = None;
     let mut codec: Option<Compression> = None;
     let mut num_values: Option<i64> = None;
     let mut total_uncompressed_size: Option<i64> = None;
@@ -785,8 +411,7 @@ fn read_column_chunk<'a>(
                     match field_ident.id {
                         // 1: type is never used, we can use the column 
descriptor
                         2 => {
-                            let val =
-                                read_thrift_vec::<Encoding, 
ThriftSliceInputProtocol>(&mut *prot)?;
+                            let val = EncodingMask::read_thrift(&mut *prot)?;
                             encodings = Some(val);
                         }
                         // 3: path_in_schema is redundant
@@ -1637,7 +1262,10 @@ pub(crate) fn serialize_column_meta_data<W: Write>(
     use crate::file::statistics::page_stats_to_thrift;
 
     column_chunk.column_type().write_thrift_field(w, 1, 0)?;
-    column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+    column_chunk
+        .encodings()
+        .collect::<Vec<_>>()
+        .write_thrift_field(w, 2, 1)?;
     let path = column_chunk.column_descr.path().parts();
     let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
     path.write_thrift_field(w, 3, 2)?;
diff --git a/parquet/src/file/metadata/writer.rs 
b/parquet/src/file/metadata/writer.rs
index 35c69935a8..398f5419c6 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -31,7 +31,7 @@ use crate::{
         modules::{ModuleType, create_footer_aad, create_module_aad},
     },
     file::column_crypto_metadata::ColumnCryptoMetaData,
-    file::metadata::thrift_gen::{AesGcmV1, EncryptionAlgorithm, 
FileCryptoMetaData},
+    file::metadata::encryption::{AesGcmV1, EncryptionAlgorithm, 
FileCryptoMetaData},
 };
 use crate::{errors::Result, 
file::page_index::column_index::ColumnIndexMetaData};
 
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index a1008c9509..d261d9ec65 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -706,7 +706,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
         let map_offset = |x| x - src_offset + write_offset as i64;
         let mut builder = 
ColumnChunkMetaData::builder(metadata.column_descr_ptr())
             .set_compression(metadata.compression())
-            .set_encodings(metadata.encodings().clone())
+            .set_encodings_mask(*metadata.encodings_mask())
             .set_total_compressed_size(metadata.compressed_size())
             .set_total_uncompressed_size(metadata.uncompressed_size())
             .set_num_values(metadata.num_values())
@@ -2390,6 +2390,7 @@ mod tests {
                     .row_group(0)
                     .column(x)
                     .encodings()
+                    .collect::<Vec<_>>()
                     .contains(&Encoding::BYTE_STREAM_SPLIT)
             );
         };
diff --git a/parquet/src/schema/printer.rs b/parquet/src/schema/printer.rs
index 0cc5df59f3..838d84a1b1 100644
--- a/parquet/src/schema/printer.rs
+++ b/parquet/src/schema/printer.rs
@@ -171,11 +171,7 @@ fn print_row_group_metadata(out: &mut dyn io::Write, 
rg_metadata: &RowGroupMetaD
 fn print_column_chunk_metadata(out: &mut dyn io::Write, cc_metadata: 
&ColumnChunkMetaData) {
     writeln!(out, "column type: {}", cc_metadata.column_type());
     writeln!(out, "column path: {}", cc_metadata.column_path());
-    let encoding_strs: Vec<_> = cc_metadata
-        .encodings()
-        .iter()
-        .map(|e| format!("{e}"))
-        .collect();
+    let encoding_strs: Vec<_> = cc_metadata.encodings().map(|e| 
format!("{e}")).collect();
     writeln!(out, "encodings: {}", encoding_strs.join(" "));
     let file_path_str = cc_metadata.file_path().unwrap_or("N/A");
     writeln!(out, "file path: {file_path_str}");

Reply via email to