This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d5339f31a6 Add Parquet Modular decryption (read) support + 
`encryption` flag (#6637)
d5339f31a6 is described below

commit d5339f31a60a4bd8a4256e7120fe32603249d88e
Author: Rok Mihevc <r...@mihevc.org>
AuthorDate: Wed Mar 12 22:19:08 2025 +0100

    Add Parquet Modular decryption (read) support + `encryption` flag (#6637)
    
    * first commit
    
    * Use ParquetMetaDataReader
    
    * Fix CI
    
    * test
    
    * save progress
    
    * work
    
    * Review feedback
    
    * page decompression issue
    
    * add update_aad
    
    * Change encrypt and decrypt to return Results
    
    * Use correct page ordinal and module type in AADs
    
    * Tidy up ordinal types
    
    * Lint
    
    * Fix regular deserialization path
    
    * cleaning
    
    * Update data checks in test
    
    * start non-uniform decryption
    
    * Add missing doc comments
    
    * Make encryption an optional feature
    
    * Handle when a file is encrypted but encryption is disabled or no 
decryption properties are provided
    
    * Allow for plaintext footer
    
    * work
    
    * Fix method name
    
    * work
    
    * Minor
    
    * work
    
    * work
    
    * work
    
    * Fix reading to end of file
    
    * Refactor tests
    
    * Fix non-uniform encryption configuration
    
    * Don't use footer key for non-encrypted columns
    
    * Rebase and cleanup
    
    * Cleanup
    
    * Cleanup
    
    * Cleanup
    
    * Cleanup
    
    * Cleanup
    
    * Cleanup
    
    * lint
    
    * Remove encryption setup
    
    * Fix building with ring on wasm
    
    * file_decryptor into a seperate module
    
    * lint
    
    * FileDecryptionProperties should have at least one key
    
    * Move cyphertext reading into decryptor
    
    * More tidy up of footer key handling
    
    * Get column decryptors as RingGcmBlockDecryptor
    
    * Use Arc<dyn BlockDecryptor>
    
    * Fix file metadata tests
    
    * Handle reading plaintext footer files without decryption properties
    
    * Split up encryption modules further
    
    * Error instead of panic for AES-GCM-CTR
    
    * load_async
    
    * new_with_options
    
    * Add tests
    
    * get_metadata
    
    * Add CryptoContext to async_reader
    
    * Add row_group_ordinal to InMemoryRowGroup
    
    * Adjust docstrings
    
    * Apply suggestions from code review
    
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    
    * Review feedback
    
    * move file_decryption_properties into ArrowReaderOptions
    
    * make create_page_aad method of CryptoContext
    
    * Review feedback
    
    * Infer ModuleType in create_page_aad
    
    * add create_page_header_aad
    
    * Review feedback
    
    * Update parquet/src/arrow/async_reader/store.rs
    
    Co-authored-by: Ed Seidl <etse...@users.noreply.github.com>
    
    * Review feedback
    
    * Update parquet/src/encryption/ciphers.rs
    
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    
    * Review feedback
    
    * WIP: Decryption shouldn't change the API
    
    * WIP: Decryption shouldn't change the API
    
    * WIP: Decryption shouldn't change the API
    
    * WIP: Decryption shouldn't change the API
    
    * WIP: Decryption shouldn't change the API
    
    * WIP: Decryption shouldn't change the API
    
    * Review feedback
    
    * Handle common encryption errors
    
    Co-authored-by: Corwin Joy <corwin....@gmail.com>
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    
    * Apply suggestions from code review
    
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    
    * Update parquet/src/arrow/async_reader/mod.rs
    
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    
    * Fix previous commit
    
    * Add TestReader::new, less pub functions, add 
test_non_uniform_encryption_disabled_aad_storage
    
    * Review feedback
    
    * Add new CI check
    
    * Rename decryption module to decrypt. This is because we'll introduce 
encryption module later and we'll have to name it encrypt to not clash with the 
super module name (encryption). It would be odd to have sub modules called 
encrypt and decryption.
    
    * Fix failing where encryption is enabled but no decryption properties are 
provided or where encyption is disabled but file is encrypted.
    
    * Apply suggestions from code review
    
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    
    * get_metadata_with_encryption -> get_metadata_with_options
    
    * Use ParquetMetaData instead of RowGroupMetaData in InMemoryRowGroup. 
Change row_group_ordinal to row_group_idx.
    
    * Review feedback
    
    * Fixes
    
    * Continue refactor away from encryption specific APIs and fix async reader 
load
    
    * Add default get_metadata_with_options implementation
    
    * Minor tidy ups and test fix
    
    * Update parquet/src/encryption/mod.rs
    
    * Update parquet/src/lib.rs
    
    Co-authored-by: Rok Mihevc <r...@mihevc.org>
    
    ---------
    
    Co-authored-by: Gidon Gershinsky <ggershin...@apple.com>
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    Co-authored-by: Ed Seidl <etse...@users.noreply.github.com>
    Co-authored-by: Corwin Joy <corwin....@gmail.com>
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 .github/workflows/parquet.yml                      |   2 +
 parquet/Cargo.toml                                 |   5 +
 parquet/README.md                                  |   3 +
 parquet/src/arrow/arrow_reader/mod.rs              | 318 ++++++++++++++-
 parquet/src/arrow/async_reader/metadata.rs         |   3 +-
 parquet/src/arrow/async_reader/mod.rs              | 426 ++++++++++++++++++++-
 parquet/src/arrow/async_reader/store.rs            |  77 +++-
 parquet/src/column/page.rs                         |  10 +
 parquet/src/encryption/ciphers.rs                  |  63 +++
 parquet/src/encryption/decrypt.rs                  | 260 +++++++++++++
 .../src/{util/test_common => encryption}/mod.rs    |  11 +-
 parquet/src/encryption/modules.rs                  |  94 +++++
 parquet/src/errors.rs                              |   7 +
 parquet/src/file/footer.rs                         |   9 +-
 parquet/src/file/metadata/mod.rs                   | 129 ++++++-
 parquet/src/file/metadata/reader.rs                | 253 ++++++++++--
 parquet/src/file/mod.rs                            |   1 +
 parquet/src/file/serialized_reader.rs              | 141 ++++++-
 parquet/src/lib.rs                                 |   4 +
 parquet/src/util/test_common/encryption_util.rs    | 135 +++++++
 parquet/src/util/test_common/mod.rs                |   3 +
 21 files changed, 1872 insertions(+), 82 deletions(-)

diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index 4c46fde198..96c7ab8f4e 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -111,6 +111,8 @@ jobs:
         run: cargo check -p parquet --all-targets --all-features
       - name: Check compilation  --all-targets --no-default-features 
--features json
         run: cargo check -p parquet --all-targets --no-default-features 
--features json
+      - name: Check compilation --no-default-features --features encryption 
--features async
+        run: cargo check -p parquet --no-default-features --features 
encryption --features async
 
   # test the parquet crate builds against wasm32 in stable rust
   wasm32-build:
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index e0a9be6791..ca84a1c72b 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -30,6 +30,8 @@ rust-version = { workspace = true }
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 ahash = { version = "0.8", default-features = false, features = 
["compile-time-rng"] }
+# See https://github.com/briansmith/ring/issues/918#issuecomment-2077788925
+ring = { version = "0.17", default-features = false, features = 
["wasm32_unknown_unknown_js", "std"], optional = true }
 
 [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
 ahash = { version = "0.8", default-features = false, features = 
["runtime-rng"] }
@@ -70,6 +72,7 @@ half = { version = "2.1", default-features = false, features 
= ["num-traits"] }
 sysinfo = { version = "0.33.0", optional = true, default-features = false, 
features = ["system"] }
 crc32fast = { version = "1.4.2", optional = true, default-features = false }
 simdutf8 = { version = "0.1.5", optional = true, default-features = false }
+ring = { version = "0.17", default-features = false, features = ["std"], 
optional = true }
 
 [dev-dependencies]
 base64 = { version = "0.22", default-features = false, features = ["std"] }
@@ -117,6 +120,8 @@ sysinfo = ["dep:sysinfo"]
 crc = ["dep:crc32fast"]
 # Enable SIMD UTF-8 validation
 simdutf8 = ["dep:simdutf8"]
+# Enable Parquet modular encryption support
+encryption = ["dep:ring"]
 
 
 [[example]]
diff --git a/parquet/README.md b/parquet/README.md
index 1224e52f3f..9245664b4e 100644
--- a/parquet/README.md
+++ b/parquet/README.md
@@ -63,6 +63,7 @@ The `parquet` crate provides the following features which may 
be enabled in your
 - `crc` - enables functionality to automatically verify checksums of each page 
(if present) when decoding
 - `experimental` - Experimental APIs which may change, even between minor 
releases
 - `simdutf8` (default) - Use the [`simdutf8`] crate for SIMD-accelerated UTF-8 
validation
+- `encryption` - support for reading / writing encrypted Parquet files
 
 [`arrow`]: https://crates.io/crates/arrow
 [`simdutf8`]: https://crates.io/crates/simdutf8
@@ -76,12 +77,14 @@ The `parquet` crate provides the following features which 
may be enabled in your
   - [x] Row record reader
   - [x] Arrow record reader
   - [x] Async support (to Arrow)
+  - [x] Encrypted files
 - [x] Statistics support
 - [x] Write support
   - [x] Primitive column value writers
   - [ ] Row record writer
   - [x] Arrow record writer
   - [x] Async support
+  - [ ] Encrypted files
 - [x] Predicate pushdown
 - [x] Parquet format 4.0.0 support
 
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 6eba04c86f..2ee69dcf10 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -17,9 +17,6 @@
 
 //! Contains reader which reads parquet data into arrow [`RecordBatch`]
 
-use std::collections::VecDeque;
-use std::sync::Arc;
-
 use arrow_array::cast::AsArray;
 use arrow_array::Array;
 use arrow_array::{RecordBatch, RecordBatchReader};
@@ -27,12 +24,16 @@ use arrow_schema::{ArrowError, DataType as ArrowType, 
Schema, SchemaRef};
 use arrow_select::filter::prep_null_mask_filter;
 pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
 pub use selection::{RowSelection, RowSelector};
+use std::collections::VecDeque;
+use std::sync::Arc;
 
 pub use crate::arrow::array_reader::RowGroups;
 use crate::arrow::array_reader::{build_array_reader, ArrayReader};
 use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
 use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
 use crate::column::page::{PageIterator, PageReader};
+#[cfg(feature = "encryption")]
+use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties};
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use crate::file::reader::{ChunkReader, SerializedPageReader};
@@ -252,6 +253,9 @@ pub struct ArrowReaderOptions {
     supplied_schema: Option<SchemaRef>,
     /// If true, attempt to read `OffsetIndex` and `ColumnIndex`
     pub(crate) page_index: bool,
+    /// If encryption is enabled, the file decryption properties can be 
provided
+    #[cfg(feature = "encryption")]
+    pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
 }
 
 impl ArrowReaderOptions {
@@ -317,7 +321,7 @@ impl ArrowReaderOptions {
     ///
     /// // Create the reader and read the data using the supplied schema.
     /// let mut reader = builder.build().unwrap();
-    /// let _batch = reader.next().unwrap().unwrap();   
+    /// let _batch = reader.next().unwrap().unwrap();
     /// ```
     pub fn with_schema(self, schema: SchemaRef) -> Self {
         Self {
@@ -342,6 +346,20 @@ impl ArrowReaderOptions {
     pub fn with_page_index(self, page_index: bool) -> Self {
         Self { page_index, ..self }
     }
+
+    /// Provide the file decryption properties to use when reading encrypted 
parquet files.
+    ///
+    /// If encryption is enabled and the file is encrypted, the 
`file_decryption_properties` must be provided.
+    #[cfg(feature = "encryption")]
+    pub fn with_file_decryption_properties(
+        self,
+        file_decryption_properties: FileDecryptionProperties,
+    ) -> Self {
+        Self {
+            file_decryption_properties: Some(file_decryption_properties),
+            ..self
+        }
+    }
 }
 
 /// The metadata necessary to construct a [`ArrowReaderBuilder`]
@@ -380,9 +398,11 @@ impl ArrowReaderMetadata {
     /// `Self::metadata` is missing the page index, this function will attempt
     /// to load the page index by making an object store request.
     pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> 
Result<Self> {
-        let metadata = ParquetMetaDataReader::new()
-            .with_page_indexes(options.page_index)
-            .parse_and_finish(reader)?;
+        let metadata = 
ParquetMetaDataReader::new().with_page_indexes(options.page_index);
+        #[cfg(feature = "encryption")]
+        let metadata =
+            
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
+        let metadata = metadata.parse_and_finish(reader)?;
         Self::try_new(Arc::new(metadata), options)
     }
 
@@ -553,6 +573,7 @@ impl<T: ChunkReader + 'static> 
ParquetRecordBatchReaderBuilder<T> {
     /// # use arrow_schema::{DataType, Field, Schema};
     /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, 
ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
     /// # use parquet::arrow::ArrowWriter;
+    /// #
     /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
     /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", 
DataType::Int32, false)]));
     /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), 
None).unwrap();
@@ -670,14 +691,55 @@ impl<T: ChunkReader + 'static> Iterator for 
ReaderPageIterator<T> {
         let meta = rg.column(self.column_idx);
         let offset_index = self.metadata.offset_index();
         // `offset_index` may not exist and `i[rg_idx]` will be empty.
-        // To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out 
empty `i[rg_idx]`.
+        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out 
empty `i[rg_idx]`.
         let page_locations = offset_index
             .filter(|i| !i[rg_idx].is_empty())
             .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
         let total_rows = rg.num_rows() as usize;
         let reader = self.reader.clone();
 
+        #[cfg(feature = "encryption")]
+        let crypto_context = if let Some(file_decryptor) = 
self.metadata.file_decryptor() {
+            let column_name = self
+                .metadata
+                .file_metadata()
+                .schema_descr()
+                .column(self.column_idx);
+
+            if file_decryptor.is_column_encrypted(column_name.name()) {
+                let data_decryptor = 
file_decryptor.get_column_data_decryptor(column_name.name());
+                let data_decryptor = match data_decryptor {
+                    Ok(data_decryptor) => data_decryptor,
+                    Err(err) => return Some(Err(err)),
+                };
+
+                let metadata_decryptor =
+                    
file_decryptor.get_column_metadata_decryptor(column_name.name());
+                let metadata_decryptor = match metadata_decryptor {
+                    Ok(metadata_decryptor) => metadata_decryptor,
+                    Err(err) => return Some(Err(err)),
+                };
+
+                let crypto_context = CryptoContext::new(
+                    rg_idx,
+                    self.column_idx,
+                    data_decryptor,
+                    metadata_decryptor,
+                    file_decryptor.file_aad().clone(),
+                );
+                Some(Arc::new(crypto_context))
+            } else {
+                None
+            }
+        } else {
+            None
+        };
+
         let ret = SerializedPageReader::new(reader, meta, total_rows, 
page_locations);
+
+        #[cfg(feature = "encryption")]
+        let ret = ret.map(|reader| reader.with_crypto_context(crypto_context));
+
         Some(ret.map(|x| Box::new(x) as _))
     }
 }
@@ -943,6 +1005,7 @@ mod tests {
     };
     use arrow_select::concat::concat_batches;
 
+    use crate::arrow::arrow_reader::ArrowReaderMetadata;
     use crate::arrow::arrow_reader::{
         ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, 
ParquetRecordBatchReader,
         ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
@@ -955,11 +1018,15 @@ mod tests {
         BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, 
FixedLenByteArrayType,
         FloatType, Int32Type, Int64Type, Int96Type,
     };
+    #[cfg(feature = "encryption")]
+    use crate::encryption::decrypt::FileDecryptionProperties;
     use crate::errors::Result;
     use crate::file::properties::{EnabledStatistics, WriterProperties, 
WriterVersion};
     use crate::file::writer::SerializedFileWriter;
     use crate::schema::parser::parse_message_type;
     use crate::schema::types::{Type, TypePtr};
+    #[cfg(feature = "encryption")]
+    use 
crate::util::test_common::encryption_util::verify_encryption_test_file_read;
     use crate::util::test_common::rand_gen::RandGen;
 
     #[test]
@@ -1788,6 +1855,241 @@ mod tests {
         assert!(col.value(2).is_nan());
     }
 
+    #[test]
+    #[cfg(feature = "encryption")]
+    fn test_non_uniform_encryption_plaintext_footer() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
+        let file = File::open(path).unwrap();
+
+        // There is always a footer key even with a plaintext footer,
+        // but this is used for signing the footer.
+        let footer_key = "0123456789012345".as_bytes(); // 128bit/16
+        let column_1_key = "1234567890123450".as_bytes();
+        let column_2_key = "1234567890123451".as_bytes();
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_key("double_field", column_1_key.to_vec())
+            .with_column_key("float_field", column_2_key.to_vec())
+            .build()
+            .unwrap();
+
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
+
+    #[test]
+    #[cfg(feature = "encryption")]
+    fn test_non_uniform_encryption_disabled_aad_storage() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path =
+            
format!("{testdata}/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted");
+        let file = File::open(path.clone()).unwrap();
+
+        let footer_key = "0123456789012345".as_bytes(); // 128bit/16
+        let column_1_key = "1234567890123450".as_bytes();
+        let column_2_key = "1234567890123451".as_bytes();
+
+        // Can read successfully when providing the correct AAD prefix
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_key("double_field", column_1_key.to_vec())
+            .with_column_key("float_field", column_2_key.to_vec())
+            .with_aad_prefix("tester".as_bytes().to_vec())
+            .build()
+            .unwrap();
+
+        verify_encryption_test_file_read(file, decryption_properties);
+
+        // Using wrong AAD prefix should fail
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_key("double_field", column_1_key.to_vec())
+            .with_column_key("float_field", column_2_key.to_vec())
+            .with_aad_prefix("wrong_aad_prefix".as_bytes().to_vec())
+            .build()
+            .unwrap();
+
+        let file = File::open(path.clone()).unwrap();
+        let options = ArrowReaderOptions::default()
+            .with_file_decryption_properties(decryption_properties.clone());
+        let result = ArrowReaderMetadata::load(&file, options.clone());
+        assert!(result.is_err());
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer"
+        );
+
+        // Not providing any AAD prefix should fail as it isn't stored in the 
file
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_key("double_field", column_1_key.to_vec())
+            .with_column_key("float_field", column_2_key.to_vec())
+            .build()
+            .unwrap();
+
+        let file = File::open(path).unwrap();
+        let options = ArrowReaderOptions::default()
+            .with_file_decryption_properties(decryption_properties.clone());
+        let result = ArrowReaderMetadata::load(&file, options.clone());
+        assert!(result.is_err());
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer"
+        );
+    }
+
+    #[test]
+    fn test_non_uniform_encryption_plaintext_footer_without_decryption() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
+        let file = File::open(&path).unwrap();
+
+        let metadata = ArrowReaderMetadata::load(&file, 
Default::default()).unwrap();
+        let file_metadata = metadata.metadata.file_metadata();
+
+        assert_eq!(file_metadata.num_rows(), 50);
+        assert_eq!(file_metadata.schema_descr().num_columns(), 8);
+        assert_eq!(
+            file_metadata.created_by().unwrap(),
+            "parquet-cpp-arrow version 19.0.0-SNAPSHOT"
+        );
+
+        metadata.metadata.row_groups().iter().for_each(|rg| {
+            assert_eq!(rg.num_columns(), 8);
+            assert_eq!(rg.num_rows(), 50);
+        });
+
+        // Should be able to read unencrypted columns. Test reading one column.
+        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+        let mask = ProjectionMask::leaves(builder.parquet_schema(), [1]);
+        let record_reader = builder.with_projection(mask).build().unwrap();
+
+        let mut row_count = 0;
+        for batch in record_reader {
+            let batch = batch.unwrap();
+            row_count += batch.num_rows();
+
+            let time_col = batch
+                .column(0)
+                .as_primitive::<types::Time32MillisecondType>();
+            for (i, x) in time_col.iter().enumerate() {
+                assert_eq!(x.unwrap(), i as i32);
+            }
+        }
+
+        assert_eq!(row_count, file_metadata.num_rows() as usize);
+
+        // Reading an encrypted column should fail
+        let file = File::open(&path).unwrap();
+        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+        let mask = ProjectionMask::leaves(builder.parquet_schema(), [4]);
+        let mut record_reader = builder.with_projection(mask).build().unwrap();
+
+        match record_reader.next() {
+            Some(Err(ArrowError::ParquetError(s))) => {
+                assert!(s.contains("protocol error"));
+            }
+            _ => {
+                panic!("Expected ArrowError::ParquetError");
+            }
+        };
+    }
+
+    #[test]
+    #[cfg(feature = "encryption")]
+    fn test_non_uniform_encryption() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+        let file = File::open(path).unwrap();
+
+        let footer_key = "0123456789012345".as_bytes(); // 128bit/16
+        let column_1_key = "1234567890123450".as_bytes();
+        let column_2_key = "1234567890123451".as_bytes();
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_key("double_field", column_1_key.to_vec())
+            .with_column_key("float_field", column_2_key.to_vec())
+            .build()
+            .unwrap();
+
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
+
+    #[test]
+    #[cfg(feature = "encryption")]
+    fn test_uniform_encryption() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
+        let file = File::open(path).unwrap();
+
+        let key_code: &[u8] = "0123456789012345".as_bytes();
+        let decryption_properties = 
FileDecryptionProperties::builder(key_code.to_vec())
+            .build()
+            .unwrap();
+
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
+
+    #[test]
+    #[cfg(not(feature = "encryption"))]
+    fn test_decrypting_without_encryption_flag_fails() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
+        let file = File::open(path).unwrap();
+
+        let options = ArrowReaderOptions::default();
+        let result = ArrowReaderMetadata::load(&file, options.clone());
+        assert!(result.is_err());
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: Parquet file has an encrypted footer but the 
encryption feature is disabled"
+        );
+    }
+
+    #[test]
+    #[cfg(feature = "encryption")]
+    fn test_decrypting_without_decryption_properties_fails() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
+        let file = File::open(path).unwrap();
+
+        let options = ArrowReaderOptions::default();
+        let result = ArrowReaderMetadata::load(&file, options.clone());
+        assert!(result.is_err());
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: Parquet file has an encrypted footer but no 
decryption properties were provided"
+        );
+    }
+
+    #[test]
+    #[cfg(feature = "encryption")]
+    fn test_aes_ctr_encryption() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_and_footer_ctr.parquet.encrypted");
+        let file = File::open(path).unwrap();
+
+        let footer_key = "0123456789012345".as_bytes();
+        let column_1_key = "1234567890123450".as_bytes();
+        let column_2_key = "1234567890123451".as_bytes();
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_key("double_field", column_1_key.to_vec())
+            .with_column_key("float_field", column_2_key.to_vec())
+            .build()
+            .unwrap();
+
+        let options =
+            
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+        let metadata = ArrowReaderMetadata::load(&file, options);
+
+        match metadata {
+            Err(crate::errors::ParquetError::NYI(s)) => {
+                assert!(s.contains("AES_GCM_CTR_V1"));
+            }
+            _ => {
+                panic!("Expected ParquetError::NYI");
+            }
+        };
+    }
+
     #[test]
     fn test_read_float32_float64_byte_stream_split() {
         let path = format!(
diff --git a/parquet/src/arrow/async_reader/metadata.rs 
b/parquet/src/arrow/async_reader/metadata.rs
index 526818845b..71d2e57ddd 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -113,7 +113,8 @@ impl<F: MetadataFetch> MetadataLoader<F> {
         let mut footer = [0; FOOTER_SIZE];
         footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
 
-        let length = ParquetMetaDataReader::decode_footer(&footer)?;
+        let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
+        let length = footer.metadata_length();
 
         if file_size < length + FOOTER_SIZE {
             return Err(ParquetError::EOF(format!(
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 2c8a59399d..9afd7d8355 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -52,7 +52,7 @@ use crate::bloom_filter::{
 };
 use crate::column::page::{PageIterator, PageReader};
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, 
RowGroupMetaData};
+use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use crate::file::page_index::offset_index::OffsetIndexMetaData;
 use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
 use crate::file::FOOTER_SIZE;
@@ -61,6 +61,9 @@ use crate::format::{BloomFilterAlgorithm, 
BloomFilterCompression, BloomFilterHas
 mod metadata;
 pub use metadata::*;
 
+#[cfg(feature = "encryption")]
+use crate::encryption::decrypt::CryptoContext;
+
 #[cfg(feature = "object_store")]
 mod store;
 
@@ -104,6 +107,19 @@ pub trait AsyncFileReader: Send {
     /// allowing fine-grained control over how metadata is sourced, in 
particular allowing
     /// for caching, pre-fetching, catalog metadata, etc...
     fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
+
+    /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet 
file,
+    /// allowing fine-grained control over how metadata is sourced, in 
particular allowing
+    /// for caching, pre-fetching, catalog metadata, decrypting, etc...
+    ///
+    /// By default calls `get_metadata()`
+    fn get_metadata_with_options<'a>(
+        &'a mut self,
+        options: &'a ArrowReaderOptions,
+    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
+        let _ = options;
+        self.get_metadata()
+    }
 }
 
 /// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
@@ -119,6 +135,13 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
     fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
         self.as_mut().get_metadata()
     }
+
+    fn get_metadata_with_options<'a>(
+        &'a mut self,
+        options: &'a ArrowReaderOptions,
+    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
+        self.as_mut().get_metadata_with_options(options)
+    }
 }
 
 impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
@@ -138,6 +161,38 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> 
AsyncFileReader for T {
         .boxed()
     }
 
+    fn get_metadata_with_options<'a>(
+        &'a mut self,
+        options: &'a ArrowReaderOptions,
+    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
+        const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
+        async move {
+            self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
+
+            let mut buf = [0_u8; FOOTER_SIZE];
+            self.read_exact(&mut buf).await?;
+
+            let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
+            let metadata_len = footer.metadata_length();
+            self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
+                .await?;
+
+            let mut buf = Vec::with_capacity(metadata_len);
+            self.take(metadata_len as _).read_to_end(&mut buf).await?;
+
+            let metadata_reader = ParquetMetaDataReader::new();
+
+            #[cfg(feature = "encryption")]
+            let metadata_reader = metadata_reader
+                
.with_decryption_properties(options.file_decryption_properties.as_ref());
+
+            let parquet_metadata = 
metadata_reader.decode_footer_metadata(&buf, &footer)?;
+
+            Ok(Arc::new(parquet_metadata))
+        }
+        .boxed()
+    }
+
     fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
         const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
         async move {
@@ -146,7 +201,15 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> 
AsyncFileReader for T {
             let mut buf = [0_u8; FOOTER_SIZE];
             self.read_exact(&mut buf).await?;
 
-            let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?;
+            let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
+            let metadata_len = footer.metadata_length();
+
+            if footer.is_encrypted_footer() {
+                return Err(general_err!(
+                    "Parquet file has an encrypted footer but decryption 
properties were not provided"
+                ));
+            }
+
             self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
                 .await?;
 
@@ -175,7 +238,7 @@ impl ArrowReaderMetadata {
     ) -> Result<Self> {
         // TODO: this is all rather awkward. It would be nice if 
AsyncFileReader::get_metadata
         // took an argument to fetch the page indexes.
-        let mut metadata = input.get_metadata().await?;
+        let mut metadata = input.get_metadata_with_options(&options).await?;
 
         if options.page_index
             && metadata.column_index().is_none()
@@ -183,6 +246,13 @@ impl ArrowReaderMetadata {
         {
             let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| 
e.as_ref().clone());
             let mut reader = 
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
+
+            #[cfg(feature = "encryption")]
+            {
+                reader =
+                    
reader.with_decryption_properties(options.file_decryption_properties.as_ref());
+            }
+
             reader.load_page_index(input).await?;
             metadata = Arc::new(reader.finish()?)
         }
@@ -201,7 +271,7 @@ pub struct AsyncReader<T>(T);
 ///
 /// This builder  handles reading the parquet file metadata, allowing consumers
 /// to use this information to select what specific columns, row groups, etc...
-/// they wish to be read by the resulting stream
+/// they wish to be read by the resulting stream.
 ///
 /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
 ///
@@ -344,7 +414,7 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     }
 
     /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided 
async source
-    /// and [`ArrowReaderOptions`]
+    /// and [`ArrowReaderOptions`].
     pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) 
-> Result<Self> {
         let metadata = ArrowReaderMetadata::load_async(&mut input, 
options).await?;
         Ok(Self::new_with_metadata(input, metadata))
@@ -563,11 +633,12 @@ where
             .map(|x| x[row_group_idx].as_slice());
 
         let mut row_group = InMemoryRowGroup {
-            metadata: meta,
             // schema: meta.schema_descr_ptr(),
             row_count: meta.num_rows() as usize,
             column_chunks: vec![None; meta.columns().len()],
             offset_index,
+            row_group_idx,
+            metadata: self.metadata.as_ref(),
         };
 
         if let Some(filter) = self.filter.as_mut() {
@@ -849,10 +920,11 @@ where
 
 /// An in-memory collection of column chunks
 struct InMemoryRowGroup<'a> {
-    metadata: &'a RowGroupMetaData,
     offset_index: Option<&'a [OffsetIndexMetaData]>,
     column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
     row_count: usize,
+    row_group_idx: usize,
+    metadata: &'a ParquetMetaData,
 }
 
 impl InMemoryRowGroup<'_> {
@@ -863,6 +935,7 @@ impl InMemoryRowGroup<'_> {
         projection: &ProjectionMask,
         selection: Option<&RowSelection>,
     ) -> Result<()> {
+        let metadata = self.metadata.row_group(self.row_group_idx);
         if let Some((selection, offset_index)) = 
selection.zip(self.offset_index) {
             // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
             // `RowSelection`
@@ -871,7 +944,7 @@ impl InMemoryRowGroup<'_> {
             let fetch_ranges = self
                 .column_chunks
                 .iter()
-                .zip(self.metadata.columns())
+                .zip(metadata.columns())
                 .enumerate()
                 .filter(|&(idx, (chunk, _chunk_meta))| {
                     chunk.is_none() && projection.leaf_included(idx)
@@ -910,7 +983,7 @@ impl InMemoryRowGroup<'_> {
                     }
 
                     *chunk = Some(Arc::new(ColumnChunkData::Sparse {
-                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        length: metadata.column(idx).byte_range().1 as usize,
                         data: 
offsets.into_iter().zip(chunks.into_iter()).collect(),
                     }))
                 }
@@ -922,7 +995,7 @@ impl InMemoryRowGroup<'_> {
                 .enumerate()
                 .filter(|&(idx, chunk)| chunk.is_none() && 
projection.leaf_included(idx))
                 .map(|(idx, _chunk)| {
-                    let column = self.metadata.column(idx);
+                    let column = metadata.column(idx);
                     let (start, length) = column.byte_range();
                     start as usize..(start + length) as usize
                 })
@@ -937,7 +1010,7 @@ impl InMemoryRowGroup<'_> {
 
                 if let Some(data) = chunk_data.next() {
                     *chunk = Some(Arc::new(ColumnChunkData::Dense {
-                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        offset: metadata.column(idx).byte_range().0 as usize,
                         data,
                     }));
                 }
@@ -954,6 +1027,36 @@ impl RowGroups for InMemoryRowGroup<'_> {
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
+        #[cfg(feature = "encryption")]
+        let crypto_context = if let Some(file_decryptor) = 
self.metadata.clone().file_decryptor() {
+            let column_name = &self
+                .metadata
+                .clone()
+                .file_metadata()
+                .schema_descr()
+                .column(i);
+
+            if file_decryptor.is_column_encrypted(column_name.name()) {
+                let data_decryptor =
+                    
file_decryptor.get_column_data_decryptor(column_name.name())?;
+                let metadata_decryptor =
+                    
file_decryptor.get_column_metadata_decryptor(column_name.name())?;
+
+                let crypto_context = CryptoContext::new(
+                    self.row_group_idx,
+                    i,
+                    data_decryptor,
+                    metadata_decryptor,
+                    file_decryptor.file_aad().clone(),
+                );
+                Some(Arc::new(crypto_context))
+            } else {
+                None
+            }
+        } else {
+            None
+        };
+
         match &self.column_chunks[i] {
             None => Err(ParquetError::General(format!(
                 "Invalid column index {i}, column was not fetched"
@@ -964,12 +1067,18 @@ impl RowGroups for InMemoryRowGroup<'_> {
                     // filter out empty offset indexes (old versions specified 
Some(vec![]) when no present)
                     .filter(|index| !index.is_empty())
                     .map(|index| index[i].page_locations.clone());
-                let page_reader: Box<dyn PageReader> = 
Box::new(SerializedPageReader::new(
+                let metadata = self.metadata.row_group(self.row_group_idx);
+                let page_reader = SerializedPageReader::new(
                     data.clone(),
-                    self.metadata.column(i),
+                    metadata.column(i),
                     self.row_count,
                     page_locations,
-                )?);
+                )?;
+
+                #[cfg(feature = "encryption")]
+                let page_reader = 
page_reader.with_crypto_context(crypto_context);
+
+                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
 
                 Ok(Box::new(ColumnChunkIterator {
                     reader: Some(Ok(page_reader)),
@@ -1055,14 +1164,20 @@ mod tests {
     use crate::arrow::arrow_reader::{
         ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
     };
+    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
     use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
     use crate::arrow::ArrowWriter;
+    #[cfg(feature = "encryption")]
+    use crate::encryption::decrypt::FileDecryptionProperties;
     use crate::file::metadata::ParquetMetaDataReader;
     use crate::file::properties::WriterProperties;
+    #[cfg(feature = "encryption")]
+    use 
crate::util::test_common::encryption_util::verify_encryption_test_file_read_async;
     use arrow::compute::kernels::cmp::eq;
     use arrow::error::Result as ArrowResult;
     use arrow_array::builder::{ListBuilder, StringBuilder};
     use arrow_array::cast::AsArray;
+    use arrow_array::types;
     use arrow_array::types::Int32Type;
     use arrow_array::{
         Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, 
StringArray,
@@ -1074,6 +1189,7 @@ mod tests {
     use std::collections::HashMap;
     use std::sync::{Arc, Mutex};
     use tempfile::tempfile;
+    use tokio::fs::File;
 
     #[derive(Clone)]
     struct TestReader {
@@ -1091,6 +1207,13 @@ mod tests {
         fn get_metadata(&mut self) -> BoxFuture<'_, 
Result<Arc<ParquetMetaData>>> {
             futures::future::ready(Ok(self.metadata.clone())).boxed()
         }
+
+        fn get_metadata_with_options<'a>(
+            &'a mut self,
+            _options: &'a ArrowReaderOptions,
+        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
+            futures::future::ready(Ok(self.metadata.clone())).boxed()
+        }
     }
 
     #[tokio::test]
@@ -2336,4 +2459,279 @@ mod tests {
         let result = reader.try_collect::<Vec<_>>().await.unwrap();
         assert_eq!(result.len(), 1);
     }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_non_uniform_encryption_plaintext_footer() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
+        let mut file = File::open(&path).await.unwrap();
+
+        // There is always a footer key even with a plaintext footer,
+        // but this is used for signing the footer.
+        let footer_key = "0123456789012345".as_bytes().to_vec(); // 128bit/16
+        let column_1_key = "1234567890123450".as_bytes().to_vec();
+        let column_2_key = "1234567890123451".as_bytes().to_vec();
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key)
+            .with_column_key("double_field", column_1_key)
+            .with_column_key("float_field", column_2_key)
+            .build()
+            .unwrap();
+
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_misspecified_encryption_keys() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+
+        // There is always a footer key even with a plaintext footer,
+        // but this is used for signing the footer.
+        let footer_key = "0123456789012345".as_bytes(); // 128bit/16
+        let column_1_key = "1234567890123450".as_bytes();
+        let column_2_key = "1234567890123451".as_bytes();
+
+        // read file with keys and check for expected error message
+        async fn check_for_error(
+            expected_message: &str,
+            path: &String,
+            footer_key: &[u8],
+            column_1_key: &[u8],
+            column_2_key: &[u8],
+        ) {
+            let mut file = File::open(&path).await.unwrap();
+
+            let mut decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec());
+
+            if !column_1_key.is_empty() {
+                decryption_properties =
+                    decryption_properties.with_column_key("double_field", 
column_1_key.to_vec());
+            }
+
+            if !column_2_key.is_empty() {
+                decryption_properties =
+                    decryption_properties.with_column_key("float_field", 
column_2_key.to_vec());
+            }
+
+            let decryption_properties = decryption_properties.build().unwrap();
+
+            match verify_encryption_test_file_read_async(&mut file, 
decryption_properties).await {
+                Ok(_) => {
+                    panic!("did not get expected error")
+                }
+                Err(e) => {
+                    assert_eq!(e.to_string(), expected_message);
+                }
+            }
+        }
+
+        // Too short footer key
+        check_for_error(
+            "Parquet error: Invalid footer key. Failed to create AES key",
+            &path,
+            "bad_pwd".as_bytes(),
+            column_1_key,
+            column_2_key,
+        )
+        .await;
+
+        // Wrong footer key
+        check_for_error(
+            "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer",
+            &path,
+            "1123456789012345".as_bytes(),
+            column_1_key,
+            column_2_key,
+        )
+        .await;
+
+        // Missing column key
+        check_for_error("Parquet error: Unable to decrypt column 
'double_field', perhaps the column key is wrong or missing?",
+                        &path, footer_key, "".as_bytes(), column_2_key).await;
+
+        // Too short column key
+        check_for_error(
+            "Parquet error: Failed to create AES key",
+            &path,
+            footer_key,
+            "abc".as_bytes(),
+            column_2_key,
+        )
+        .await;
+
+        // Wrong column key
+        check_for_error("Parquet error: Unable to decrypt column 
'double_field', perhaps the column key is wrong or missing?",
+                        &path, footer_key, "1123456789012345".as_bytes(), 
column_2_key).await;
+
+        // Mixed up keys
+        check_for_error("Parquet error: Unable to decrypt column 
'float_field', perhaps the column key is wrong or missing?",
+                        &path, footer_key, column_2_key, column_1_key).await;
+    }
+
+    #[tokio::test]
+    async fn test_non_uniform_encryption_plaintext_footer_without_decryption() 
{
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
+        let mut file = File::open(&path).await.unwrap();
+
+        let metadata = ArrowReaderMetadata::load_async(&mut file, 
Default::default())
+            .await
+            .unwrap();
+        let file_metadata = metadata.metadata.file_metadata();
+
+        assert_eq!(file_metadata.num_rows(), 50);
+        assert_eq!(file_metadata.schema_descr().num_columns(), 8);
+        assert_eq!(
+            file_metadata.created_by().unwrap(),
+            "parquet-cpp-arrow version 19.0.0-SNAPSHOT"
+        );
+
+        metadata.metadata.row_groups().iter().for_each(|rg| {
+            assert_eq!(rg.num_columns(), 8);
+            assert_eq!(rg.num_rows(), 50);
+        });
+
+        // Should be able to read unencrypted columns. Test reading one column.
+        let builder = 
ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
+        let mask = ProjectionMask::leaves(builder.parquet_schema(), [1]);
+        let record_reader = builder.with_projection(mask).build().unwrap();
+        let record_batches = 
record_reader.try_collect::<Vec<_>>().await.unwrap();
+
+        let mut row_count = 0;
+        for batch in record_batches {
+            let batch = batch;
+            row_count += batch.num_rows();
+
+            let time_col = batch
+                .column(0)
+                .as_primitive::<types::Time32MillisecondType>();
+            for (i, x) in time_col.iter().enumerate() {
+                assert_eq!(x.unwrap(), i as i32);
+            }
+        }
+
+        assert_eq!(row_count, file_metadata.num_rows() as usize);
+
+        // Reading an encrypted column should fail
+        let file = File::open(&path).await.unwrap();
+        let builder = 
ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
+        let mask = ProjectionMask::leaves(builder.parquet_schema(), [4]);
+        let mut record_reader = builder.with_projection(mask).build().unwrap();
+
+        match record_reader.next().await {
+            Some(Err(ParquetError::ArrowError(s))) => {
+                assert!(s.contains("protocol error"));
+            }
+            _ => {
+                panic!("Expected ArrowError::ParquetError");
+            }
+        };
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_non_uniform_encryption() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+        let mut file = File::open(&path).await.unwrap();
+
+        let footer_key = "0123456789012345".as_bytes().to_vec(); // 128bit/16
+        let column_1_key = "1234567890123450".as_bytes().to_vec();
+        let column_2_key = "1234567890123451".as_bytes().to_vec();
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_key("double_field", column_1_key)
+            .with_column_key("float_field", column_2_key)
+            .build()
+            .unwrap();
+
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_uniform_encryption() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
+        let mut file = File::open(&path).await.unwrap();
+
+        let key_code: &[u8] = "0123456789012345".as_bytes();
+        let decryption_properties = 
FileDecryptionProperties::builder(key_code.to_vec())
+            .build()
+            .unwrap();
+
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_aes_ctr_encryption() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_and_footer_ctr.parquet.encrypted");
+        let mut file = File::open(&path).await.unwrap();
+
+        let footer_key = "0123456789012345".as_bytes().to_vec();
+        let column_1_key = "1234567890123450".as_bytes().to_vec();
+        let column_2_key = "1234567890123451".as_bytes().to_vec();
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key)
+            .with_column_key("double_field", column_1_key)
+            .with_column_key("float_field", column_2_key)
+            .build()
+            .unwrap();
+
+        let options =
+            
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+        let metadata = ArrowReaderMetadata::load_async(&mut file, 
options).await;
+
+        match metadata {
+            Err(ParquetError::NYI(s)) => {
+                assert!(s.contains("AES_GCM_CTR_V1"));
+            }
+            _ => {
+                panic!("Expected ParquetError::NYI");
+            }
+        };
+    }
+
+    #[tokio::test]
+    #[cfg(not(feature = "encryption"))]
+    async fn test_decrypting_without_encryption_flag_fails() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
+        let mut file = File::open(&path).await.unwrap();
+
+        let options = ArrowReaderOptions::new();
+        let result = ArrowReaderMetadata::load_async(&mut file, options).await;
+        assert!(result.is_err());
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: Parquet file has an encrypted footer but the 
encryption feature is disabled"
+        );
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_decrypting_without_decryption_properties_fails() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
+        let mut file = File::open(&path).await.unwrap();
+
+        let options = ArrowReaderOptions::new();
+        let result = ArrowReaderMetadata::load_async(&mut file, options).await;
+        assert!(result.is_err());
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: Parquet file has an encrypted footer but no 
decryption properties were provided"
+        );
+    }
 }
diff --git a/parquet/src/arrow/async_reader/store.rs 
b/parquet/src/arrow/async_reader/store.rs
index fd0397b5e1..6922f3d1f7 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -17,15 +17,15 @@
 
 use std::{ops::Range, sync::Arc};
 
+use crate::arrow::arrow_reader::ArrowReaderOptions;
+use crate::arrow::async_reader::AsyncFileReader;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use bytes::Bytes;
 use futures::{future::BoxFuture, FutureExt, TryFutureExt};
 use object_store::{path::Path, ObjectMeta, ObjectStore};
 use tokio::runtime::Handle;
 
-use crate::arrow::async_reader::AsyncFileReader;
-use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
-
 /// Reads Parquet files in object storage using [`ObjectStore`].
 ///
 /// ```no_run
@@ -175,6 +175,27 @@ impl AsyncFileReader for ParquetObjectReader {
             Ok(Arc::new(metadata))
         })
     }
+
+    fn get_metadata_with_options<'a>(
+        &'a mut self,
+        options: &'a ArrowReaderOptions,
+    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
+        Box::pin(async move {
+            let file_size = self.meta.size;
+            let metadata = ParquetMetaDataReader::new()
+                .with_column_indexes(self.preload_column_index)
+                .with_offset_indexes(self.preload_offset_index)
+                .with_prefetch_hint(self.metadata_size_hint);
+
+            #[cfg(feature = "encryption")]
+            let metadata =
+                
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
+
+            let metadata = metadata.load_and_finish(self, file_size).await?;
+
+            Ok(Arc::new(metadata))
+        })
+    }
 }
 
 #[cfg(test)]
@@ -186,16 +207,17 @@ mod tests {
 
     use futures::TryStreamExt;
 
+    use crate::arrow::arrow_reader::ArrowReaderOptions;
+    use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
+    use crate::arrow::ParquetRecordBatchStreamBuilder;
+    use crate::encryption::decrypt::FileDecryptionProperties;
+    use crate::errors::ParquetError;
     use arrow::util::test_util::parquet_test_data;
     use futures::FutureExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
     use object_store::{ObjectMeta, ObjectStore};
 
-    use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
-    use crate::arrow::ParquetRecordBatchStreamBuilder;
-    use crate::errors::ParquetError;
-
     async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
         let res = parquet_test_data();
         let store = LocalFileSystem::new_with_prefix(res).unwrap();
@@ -208,6 +230,45 @@ mod tests {
         (meta, Arc::new(store) as Arc<dyn ObjectStore>)
     }
 
+    #[cfg(feature = "encryption")]
+    async fn get_encrypted_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
+        let res = parquet_test_data();
+        let store = LocalFileSystem::new_with_prefix(res).unwrap();
+
+        let meta = store
+            .head(&Path::from("uniform_encryption.parquet.encrypted"))
+            .await
+            .unwrap();
+
+        (meta, Arc::new(store) as Arc<dyn ObjectStore>)
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_encrypted() {
+        let (meta, store) = get_encrypted_meta_store().await;
+
+        let key_code: &[u8] = "0123456789012345".as_bytes();
+        let decryption_properties = 
FileDecryptionProperties::builder(key_code.to_vec())
+            .build()
+            .unwrap();
+        let options =
+            
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+        let mut reader = ParquetObjectReader::new(store.clone(), meta.clone());
+        let metadata = 
reader.get_metadata_with_options(&options).await.unwrap();
+
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let reader = ParquetObjectReader::new(store, meta);
+        let builder = 
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
+            .await
+            .unwrap();
+        let batches: Vec<_> = 
builder.build().unwrap().try_collect().await.unwrap();
+
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 50);
+    }
+
     #[tokio::test]
     async fn test_simple() {
         let (meta, store) = get_meta_store().await;
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 5c866318e1..b5afe6b933 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -90,6 +90,16 @@ impl Page {
         }
     }
 
+    /// Returns whether this page is any version of a data page
+    pub fn is_data_page(&self) -> bool {
+        matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
+    }
+
+    /// Returns whether this page is a dictionary page
+    pub fn is_dictionary_page(&self) -> bool {
+        matches!(self, Page::DictionaryPage { .. })
+    }
+
     /// Returns internal byte buffer reference for this page.
     pub fn buffer(&self) -> &Bytes {
         match self {
diff --git a/parquet/src/encryption/ciphers.rs 
b/parquet/src/encryption/ciphers.rs
new file mode 100644
index 0000000000..9b5a04d622
--- /dev/null
+++ b/parquet/src/encryption/ciphers.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError::General;
+use crate::errors::Result;
+use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM};
+use std::fmt::Debug;
+
+const NONCE_LEN: usize = 12;
+const TAG_LEN: usize = 16;
+const SIZE_LEN: usize = 4;
+
+pub trait BlockDecryptor: Debug + Send + Sync {
+    fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> 
Result<Vec<u8>>;
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct RingGcmBlockDecryptor {
+    key: LessSafeKey,
+}
+
+impl RingGcmBlockDecryptor {
+    pub(crate) fn new(key_bytes: &[u8]) -> Result<Self> {
+        // todo support other key sizes
+        let key = UnboundKey::new(&AES_128_GCM, key_bytes)
+            .map_err(|_| General("Failed to create AES key".to_string()))?;
+
+        Ok(Self {
+            key: LessSafeKey::new(key),
+        })
+    }
+}
+
+impl BlockDecryptor for RingGcmBlockDecryptor {
+    fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> 
Result<Vec<u8>> {
+        let mut result = Vec::with_capacity(length_and_ciphertext.len() - 
SIZE_LEN - NONCE_LEN);
+        result.extend_from_slice(&length_and_ciphertext[SIZE_LEN + 
NONCE_LEN..]);
+
+        let nonce = ring::aead::Nonce::try_assume_unique_for_key(
+            &length_and_ciphertext[SIZE_LEN..SIZE_LEN + NONCE_LEN],
+        )?;
+
+        self.key.open_in_place(nonce, Aad::from(aad), &mut result)?;
+
+        // Truncate result to remove the tag
+        result.resize(result.len() - TAG_LEN, 0u8);
+        Ok(result)
+    }
+}
diff --git a/parquet/src/encryption/decrypt.rs 
b/parquet/src/encryption/decrypt.rs
new file mode 100644
index 0000000000..d5bfe3cfc0
--- /dev/null
+++ b/parquet/src/encryption/decrypt.rs
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor};
+use crate::encryption::modules::{create_module_aad, ModuleType};
+use crate::errors::{ParquetError, Result};
+use std::collections::HashMap;
+use std::io::Read;
+use std::sync::Arc;
+
+pub fn read_and_decrypt<T: Read>(
+    decryptor: &Arc<dyn BlockDecryptor>,
+    input: &mut T,
+    aad: &[u8],
+) -> Result<Vec<u8>> {
+    let mut len_bytes = [0; 4];
+    input.read_exact(&mut len_bytes)?;
+    let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
+    let mut ciphertext = vec![0; 4 + ciphertext_len];
+    input.read_exact(&mut ciphertext[4..])?;
+
+    decryptor.decrypt(&ciphertext, aad.as_ref())
+}
+
+// CryptoContext is a data structure that holds the context required to
+// decrypt parquet modules (data pages, dictionary pages, etc.).
+#[derive(Debug, Clone)]
+pub(crate) struct CryptoContext {
+    pub(crate) row_group_idx: usize,
+    pub(crate) column_ordinal: usize,
+    pub(crate) page_ordinal: Option<usize>,
+    pub(crate) dictionary_page: bool,
+    // We have separate data and metadata decryptors because
+    // in GCM CTR mode, the metadata and data pages use
+    // different algorithms.
+    data_decryptor: Arc<dyn BlockDecryptor>,
+    metadata_decryptor: Arc<dyn BlockDecryptor>,
+    file_aad: Vec<u8>,
+}
+
+impl CryptoContext {
+    pub(crate) fn new(
+        row_group_idx: usize,
+        column_ordinal: usize,
+        data_decryptor: Arc<dyn BlockDecryptor>,
+        metadata_decryptor: Arc<dyn BlockDecryptor>,
+        file_aad: Vec<u8>,
+    ) -> Self {
+        Self {
+            row_group_idx,
+            column_ordinal,
+            page_ordinal: None,
+            dictionary_page: false,
+            data_decryptor,
+            metadata_decryptor,
+            file_aad,
+        }
+    }
+
+    pub(crate) fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
+        Self {
+            row_group_idx: self.row_group_idx,
+            column_ordinal: self.column_ordinal,
+            page_ordinal: Some(page_ordinal),
+            dictionary_page: false,
+            data_decryptor: self.data_decryptor.clone(),
+            metadata_decryptor: self.metadata_decryptor.clone(),
+            file_aad: self.file_aad.clone(),
+        }
+    }
+
+    pub(crate) fn create_page_header_aad(&self) -> Result<Vec<u8>> {
+        let module_type = if self.dictionary_page {
+            ModuleType::DictionaryPageHeader
+        } else {
+            ModuleType::DataPageHeader
+        };
+
+        create_module_aad(
+            self.file_aad(),
+            module_type,
+            self.row_group_idx,
+            self.column_ordinal,
+            self.page_ordinal,
+        )
+    }
+
+    pub(crate) fn create_page_aad(&self) -> Result<Vec<u8>> {
+        let module_type = if self.dictionary_page {
+            ModuleType::DictionaryPage
+        } else {
+            ModuleType::DataPage
+        };
+
+        create_module_aad(
+            self.file_aad(),
+            module_type,
+            self.row_group_idx,
+            self.column_ordinal,
+            self.page_ordinal,
+        )
+    }
+
+    pub(crate) fn for_dictionary_page(&self) -> Self {
+        Self {
+            row_group_idx: self.row_group_idx,
+            column_ordinal: self.column_ordinal,
+            page_ordinal: self.page_ordinal,
+            dictionary_page: true,
+            data_decryptor: self.data_decryptor.clone(),
+            metadata_decryptor: self.metadata_decryptor.clone(),
+            file_aad: self.file_aad.clone(),
+        }
+    }
+
+    pub(crate) fn data_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
+        &self.data_decryptor
+    }
+
+    pub(crate) fn file_aad(&self) -> &Vec<u8> {
+        &self.file_aad
+    }
+}
+
+/// FileDecryptionProperties hold keys and AAD data required to decrypt a 
Parquet file.
+#[derive(Debug, Clone, PartialEq)]
+pub struct FileDecryptionProperties {
+    footer_key: Vec<u8>,
+    column_keys: HashMap<String, Vec<u8>>,
+    pub(crate) aad_prefix: Option<Vec<u8>>,
+}
+
+impl FileDecryptionProperties {
+    /// Returns a new FileDecryptionProperties builder
+    pub fn builder(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
+        DecryptionPropertiesBuilder::new(footer_key)
+    }
+}
+
+pub struct DecryptionPropertiesBuilder {
+    footer_key: Vec<u8>,
+    column_keys: HashMap<String, Vec<u8>>,
+    aad_prefix: Option<Vec<u8>>,
+}
+
+impl DecryptionPropertiesBuilder {
+    pub fn new(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
+        Self {
+            footer_key,
+            column_keys: HashMap::default(),
+            aad_prefix: None,
+        }
+    }
+
+    pub fn build(self) -> Result<FileDecryptionProperties> {
+        Ok(FileDecryptionProperties {
+            footer_key: self.footer_key,
+            column_keys: self.column_keys,
+            aad_prefix: self.aad_prefix,
+        })
+    }
+
+    pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
+        self.aad_prefix = Some(value);
+        self
+    }
+
+    pub fn with_column_key(mut self, column_name: &str, decryption_key: 
Vec<u8>) -> Self {
+        self.column_keys
+            .insert(column_name.to_string(), decryption_key);
+        self
+    }
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct FileDecryptor {
+    decryption_properties: FileDecryptionProperties,
+    footer_decryptor: Option<Arc<dyn BlockDecryptor>>,
+    file_aad: Vec<u8>,
+}
+
+impl PartialEq for FileDecryptor {
+    fn eq(&self, other: &Self) -> bool {
+        self.decryption_properties == other.decryption_properties
+    }
+}
+
+impl FileDecryptor {
+    pub(crate) fn new(
+        decryption_properties: &FileDecryptionProperties,
+        aad_file_unique: Vec<u8>,
+        aad_prefix: Vec<u8>,
+    ) -> Result<Self, ParquetError> {
+        let file_aad = [aad_prefix.as_slice(), 
aad_file_unique.as_slice()].concat();
+        // todo decr: if no key available yet (not set in properties, should 
be retrieved from metadata)
+        let footer_decryptor = 
RingGcmBlockDecryptor::new(&decryption_properties.footer_key)
+            .map_err(|e| {
+                general_err!(
+                    "Invalid footer key. {}",
+                    e.to_string().replace("Parquet error: ", "")
+                )
+            })?;
+        Ok(Self {
+            footer_decryptor: Some(Arc::new(footer_decryptor)),
+            decryption_properties: decryption_properties.clone(),
+            file_aad,
+        })
+    }
+
+    pub(crate) fn get_footer_decryptor(&self) -> Result<Arc<dyn 
BlockDecryptor>, ParquetError> {
+        Ok(self.footer_decryptor.clone().unwrap())
+    }
+
+    pub(crate) fn get_column_data_decryptor(
+        &self,
+        column_name: &str,
+    ) -> Result<Arc<dyn BlockDecryptor>, ParquetError> {
+        match self.decryption_properties.column_keys.get(column_name) {
+            Some(column_key) => 
Ok(Arc::new(RingGcmBlockDecryptor::new(column_key)?)),
+            None => self.get_footer_decryptor(),
+        }
+    }
+
+    pub(crate) fn get_column_metadata_decryptor(
+        &self,
+        column_name: &str,
+    ) -> Result<Arc<dyn BlockDecryptor>, ParquetError> {
+        // Once GCM CTR mode is implemented, data and metadata decryptors may 
be different
+        self.get_column_data_decryptor(column_name)
+    }
+
+    pub(crate) fn file_aad(&self) -> &Vec<u8> {
+        &self.file_aad
+    }
+
+    pub(crate) fn is_column_encrypted(&self, column_name: &str) -> bool {
+        // Column is encrypted if either uniform encryption is used or an 
encryption key is set for the column
+        match self.decryption_properties.column_keys.is_empty() {
+            false => self
+                .decryption_properties
+                .column_keys
+                .contains_key(column_name),
+            true => true,
+        }
+    }
+}
diff --git a/parquet/src/util/test_common/mod.rs b/parquet/src/encryption/mod.rs
similarity index 78%
copy from parquet/src/util/test_common/mod.rs
copy to parquet/src/encryption/mod.rs
index 8cfc1e6dd4..b4f4d6d221 100644
--- a/parquet/src/util/test_common/mod.rs
+++ b/parquet/src/encryption/mod.rs
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub mod page_util;
+//! Encryption implementation specific to Parquet, as described
+//! in the 
[spec](https://github.com/apache/parquet-format/blob/master/Encryption.md).
 
-#[cfg(test)]
-pub mod file_util;
-
-#[cfg(test)]
-pub mod rand_gen;
+pub(crate) mod ciphers;
+pub mod decrypt;
+pub(crate) mod modules;
diff --git a/parquet/src/encryption/modules.rs 
b/parquet/src/encryption/modules.rs
new file mode 100644
index 0000000000..6bf9306b25
--- /dev/null
+++ b/parquet/src/encryption/modules.rs
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+
+#[derive(PartialEq)]
+pub(crate) enum ModuleType {
+    Footer = 0,
+    ColumnMetaData = 1,
+    DataPage = 2,
+    DictionaryPage = 3,
+    DataPageHeader = 4,
+    DictionaryPageHeader = 5,
+}
+
+pub fn create_footer_aad(file_aad: &[u8]) -> crate::errors::Result<Vec<u8>> {
+    create_module_aad(file_aad, ModuleType::Footer, 0, 0, None)
+}
+
+pub(crate) fn create_module_aad(
+    file_aad: &[u8],
+    module_type: ModuleType,
+    row_group_idx: usize,
+    column_ordinal: usize,
+    page_ordinal: Option<usize>,
+) -> crate::errors::Result<Vec<u8>> {
+    let module_buf = [module_type as u8];
+
+    if module_buf[0] == (ModuleType::Footer as u8) {
+        let mut aad = Vec::with_capacity(file_aad.len() + 1);
+        aad.extend_from_slice(file_aad);
+        aad.extend_from_slice(module_buf.as_ref());
+        return Ok(aad);
+    }
+
+    if row_group_idx > i16::MAX as usize {
+        return Err(general_err!(
+            "Encrypted parquet files can't have more than {} row groups: {}",
+            i16::MAX,
+            row_group_idx
+        ));
+    }
+    if column_ordinal > i16::MAX as usize {
+        return Err(general_err!(
+            "Encrypted parquet files can't have more than {} columns: {}",
+            i16::MAX,
+            column_ordinal
+        ));
+    }
+
+    if module_buf[0] != (ModuleType::DataPageHeader as u8)
+        && module_buf[0] != (ModuleType::DataPage as u8)
+    {
+        let mut aad = Vec::with_capacity(file_aad.len() + 5);
+        aad.extend_from_slice(file_aad);
+        aad.extend_from_slice(module_buf.as_ref());
+        aad.extend_from_slice((row_group_idx as i16).to_le_bytes().as_ref());
+        aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref());
+        return Ok(aad);
+    }
+
+    let page_ordinal =
+        page_ordinal.ok_or_else(|| general_err!("Page ordinal must be set for 
data pages"))?;
+
+    if page_ordinal > i16::MAX as usize {
+        return Err(general_err!(
+            "Encrypted parquet files can't have more than {} pages per column 
chunk: {}",
+            i16::MAX,
+            page_ordinal
+        ));
+    }
+
+    let mut aad = Vec::with_capacity(file_aad.len() + 7);
+    aad.extend_from_slice(file_aad);
+    aad.extend_from_slice(module_buf.as_ref());
+    aad.extend_from_slice((row_group_idx as i16).to_le_bytes().as_ref());
+    aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref());
+    aad.extend_from_slice((page_ordinal as i16).to_le_bytes().as_ref());
+    Ok(aad)
+}
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index d749287bba..4cb1f99c3c 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -132,6 +132,13 @@ impl From<object_store::Error> for ParquetError {
     }
 }
 
+#[cfg(feature = "encryption")]
+impl From<ring::error::Unspecified> for ParquetError {
+    fn from(e: ring::error::Unspecified) -> ParquetError {
+        ParquetError::External(Box::new(e))
+    }
+}
+
 /// A specialized `Result` for Parquet errors.
 pub type Result<T, E = ParquetError> = result::Result<T, E>;
 
diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs
index bd31c9142f..85ef30cd0e 100644
--- a/parquet/src/file/footer.rs
+++ b/parquet/src/file/footer.rs
@@ -52,7 +52,7 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> 
Result<ParquetMetaDat
 /// Decodes [`ParquetMetaData`] from the provided bytes.
 ///
 /// Typically this is used to decode the metadata from the end of a parquet
-/// file. The format of `buf` is the Thift compact binary protocol, as 
specified
+/// file. The format of `buf` is the Thrift compact binary protocol, as 
specified
 /// by the [Parquet Spec].
 ///
 /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
@@ -72,7 +72,10 @@ pub fn decode_metadata(buf: &[u8]) -> 
Result<ParquetMetaData> {
 /// | len | 'PAR1' |
 /// +-----+--------+
 /// ```
-#[deprecated(since = "53.1.0", note = "Use 
ParquetMetaDataReader::decode_footer")]
+#[deprecated(
+    since = "53.1.0",
+    note = "Use ParquetMetaDataReader::decode_footer_tail"
+)]
 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
-    ParquetMetaDataReader::decode_footer(slice)
+    ParquetMetaDataReader::decode_footer_tail(slice).map(|f| 
f.metadata_length())
 }
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 252cb99f3f..217685049e 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -95,26 +95,33 @@ mod memory;
 pub(crate) mod reader;
 mod writer;
 
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::format::{
-    BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, 
PageLocation, RowGroup,
-    SizeStatistics, SortingColumn,
-};
-
 use crate::basic::{ColumnOrder, Compression, Encoding, Type};
+#[cfg(feature = "encryption")]
+use crate::encryption::{
+    decrypt::FileDecryptor,
+    modules::{create_module_aad, ModuleType},
+};
 use crate::errors::{ParquetError, Result};
 pub(crate) use crate::file::metadata::memory::HeapSize;
 use crate::file::page_encoding_stats::{self, PageEncodingStats};
 use crate::file::page_index::index::Index;
 use crate::file::page_index::offset_index::OffsetIndexMetaData;
 use crate::file::statistics::{self, Statistics};
+#[cfg(feature = "encryption")]
+use crate::format::ColumnCryptoMetaData;
+use crate::format::{
+    BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, 
PageLocation, RowGroup,
+    SizeStatistics, SortingColumn,
+};
 use crate::schema::types::{
     ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, 
SchemaDescriptor,
     Type as SchemaType,
 };
+#[cfg(feature = "encryption")]
+use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
 pub use reader::ParquetMetaDataReader;
+use std::ops::Range;
+use std::sync::Arc;
 pub use writer::ParquetMetaDataWriter;
 pub(crate) use writer::ThriftMetadataWriter;
 
@@ -174,6 +181,9 @@ pub struct ParquetMetaData {
     column_index: Option<ParquetColumnIndex>,
     /// Offset index for each page in each column chunk
     offset_index: Option<ParquetOffsetIndex>,
+    /// Optional file decryptor
+    #[cfg(feature = "encryption")]
+    file_decryptor: Option<FileDecryptor>,
 }
 
 impl ParquetMetaData {
@@ -183,11 +193,20 @@ impl ParquetMetaData {
         ParquetMetaData {
             file_metadata,
             row_groups,
+            #[cfg(feature = "encryption")]
+            file_decryptor: None,
             column_index: None,
             offset_index: None,
         }
     }
 
+    /// Adds [`FileDecryptor`] to this metadata instance to enable decryption 
of
+    /// encrypted data.
+    #[cfg(feature = "encryption")]
+    pub(crate) fn with_file_decryptor(&mut self, file_decryptor: 
Option<FileDecryptor>) {
+        self.file_decryptor = file_decryptor;
+    }
+
     /// Creates Parquet metadata from file metadata, a list of row
     /// group metadata, and the column index structures.
     #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataBuilder")]
@@ -214,6 +233,12 @@ impl ParquetMetaData {
         &self.file_metadata
     }
 
+    /// Returns file decryptor as reference.
+    #[cfg(feature = "encryption")]
+    pub(crate) fn file_decryptor(&self) -> Option<&FileDecryptor> {
+        self.file_decryptor.as_ref()
+    }
+
     /// Returns number of row groups in this file.
     pub fn num_row_groups(&self) -> usize {
         self.row_groups.len()
@@ -599,6 +624,81 @@ impl RowGroupMetaData {
         self.file_offset
     }
 
+    /// Method to convert from encrypted Thrift.
+    #[cfg(feature = "encryption")]
+    fn from_encrypted_thrift(
+        schema_descr: SchemaDescPtr,
+        mut rg: RowGroup,
+        decryptor: Option<&FileDecryptor>,
+    ) -> Result<RowGroupMetaData> {
+        if schema_descr.num_columns() != rg.columns.len() {
+            return Err(general_err!(
+                "Column count mismatch. Schema has {} columns while Row Group 
has {}",
+                schema_descr.num_columns(),
+                rg.columns.len()
+            ));
+        }
+        let total_byte_size = rg.total_byte_size;
+        let num_rows = rg.num_rows;
+        let mut columns = vec![];
+
+        for (i, (mut c, d)) in rg
+            .columns
+            .drain(0..)
+            .zip(schema_descr.columns())
+            .enumerate()
+        {
+            // Read encrypted metadata if it's present and we have a decryptor.
+            if let (true, Some(decryptor)) = 
(c.encrypted_column_metadata.is_some(), decryptor) {
+                let column_decryptor = match c.crypto_metadata.as_ref() {
+                    None => {
+                        return Err(general_err!(
+                            "No crypto_metadata is set for column '{}', which 
has encrypted metadata",
+                            d.path().string()
+                        ));
+                    }
+                    
Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => {
+                        let column_name = 
crypto_metadata.path_in_schema.join(".");
+                        
decryptor.get_column_metadata_decryptor(column_name.as_str())?
+                    }
+                    Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
+                        decryptor.get_footer_decryptor()?
+                    }
+                };
+
+                let column_aad = create_module_aad(
+                    decryptor.file_aad(),
+                    ModuleType::ColumnMetaData,
+                    rg.ordinal.unwrap() as usize,
+                    i,
+                    None,
+                )?;
+
+                let buf = c.encrypted_column_metadata.clone().unwrap();
+                let decrypted_cc_buf =
+                    column_decryptor.decrypt(buf.as_slice(), 
column_aad.as_ref()).map_err(|_| {
+                        general_err!("Unable to decrypt column '{}', perhaps 
the column key is wrong or missing?",
+                            d.path().string())
+                    })?;
+
+                let mut prot = 
TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice());
+                c.meta_data = Some(ColumnMetaData::read_from_in_protocol(&mut 
prot)?);
+            }
+            columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?);
+        }
+
+        let sorting_columns = rg.sorting_columns;
+        Ok(RowGroupMetaData {
+            columns,
+            num_rows,
+            sorting_columns,
+            total_byte_size,
+            schema_descr,
+            file_offset: rg.file_offset,
+            ordinal: rg.ordinal,
+        })
+    }
+
     /// Method to convert from Thrift.
     pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup) -> 
Result<RowGroupMetaData> {
         if schema_descr.num_columns() != rg.columns.len() {
@@ -611,10 +711,11 @@ impl RowGroupMetaData {
         let total_byte_size = rg.total_byte_size;
         let num_rows = rg.num_rows;
         let mut columns = vec![];
+
         for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) {
-            let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
-            columns.push(cc);
+            columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?);
         }
+
         let sorting_columns = rg.sorting_columns;
         Ok(RowGroupMetaData {
             columns,
@@ -1849,7 +1950,11 @@ mod tests {
         let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
             .set_row_groups(row_group_meta_with_stats)
             .build();
+
+        #[cfg(not(feature = "encryption"))]
         let base_expected_size = 2312;
+        #[cfg(feature = "encryption")]
+        let base_expected_size = 2448;
 
         assert_eq!(parquet_meta.memory_size(), base_expected_size);
 
@@ -1876,7 +1981,11 @@ mod tests {
             ]]))
             .build();
 
+        #[cfg(not(feature = "encryption"))]
         let bigger_expected_size = 2816;
+        #[cfg(feature = "encryption")]
+        let bigger_expected_size = 2952;
+
         // more set fields means more memory usage
         assert!(bigger_expected_size > base_expected_size);
         assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
diff --git a/parquet/src/file/metadata/reader.rs 
b/parquet/src/file/metadata/reader.rs
index d465a49c35..b80e76d792 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -20,13 +20,21 @@ use std::{io::Read, ops::Range, sync::Arc};
 use bytes::Bytes;
 
 use crate::basic::ColumnOrder;
+#[cfg(feature = "encryption")]
+use crate::encryption::{
+    decrypt::{FileDecryptionProperties, FileDecryptor},
+    modules::create_footer_aad,
+};
+
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
 use crate::file::page_index::index::Index;
 use crate::file::page_index::index_reader::{acc_range, decode_column_index, 
decode_offset_index};
 use crate::file::reader::ChunkReader;
-use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
+use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
 use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as 
TFileMetaData};
+#[cfg(feature = "encryption")]
+use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as 
TFileCryptoMetaData};
 use crate::schema::types;
 use crate::schema::types::SchemaDescriptor;
 use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
@@ -68,6 +76,28 @@ pub struct ParquetMetaDataReader {
     // Size of the serialized thrift metadata plus the 8 byte footer. Only set 
if
     // `self.parse_metadata` is called.
     metadata_size: Option<usize>,
+    #[cfg(feature = "encryption")]
+    file_decryption_properties: Option<FileDecryptionProperties>,
+}
+
+/// Describes how the footer metadata is stored
+///
+/// This is parsed from the last 8 bytes of the Parquet file
+pub struct FooterTail {
+    metadata_length: usize,
+    encrypted_footer: bool,
+}
+
+impl FooterTail {
+    /// The length of the footer metadata in bytes
+    pub fn metadata_length(&self) -> usize {
+        self.metadata_length
+    }
+
+    /// Whether the footer metadata is encrypted
+    pub fn is_encrypted_footer(&self) -> bool {
+        self.encrypted_footer
+    }
 }
 
 impl ParquetMetaDataReader {
@@ -126,6 +156,18 @@ impl ParquetMetaDataReader {
         self
     }
 
+    /// Provide the FileDecryptionProperties to use when decrypting the file.
+    ///
+    /// This is only necessary when the file is encrypted.
+    #[cfg(feature = "encryption")]
+    pub fn with_decryption_properties(
+        mut self,
+        properties: Option<&FileDecryptionProperties>,
+    ) -> Self {
+        self.file_decryption_properties = properties.cloned();
+        self
+    }
+
     /// Indicates whether this reader has a [`ParquetMetaData`] internally.
     pub fn has_metadata(&self) -> bool {
         self.metadata.is_some()
@@ -372,8 +414,9 @@ impl ParquetMetaDataReader {
         mut fetch: F,
         file_size: usize,
     ) -> Result<()> {
-        let (metadata, remainder) =
-            Self::load_metadata(&mut fetch, file_size, 
self.get_prefetch_size()).await?;
+        let (metadata, remainder) = self
+            .load_metadata(&mut fetch, file_size, self.get_prefetch_size())
+            .await?;
 
         self.metadata = Some(metadata);
 
@@ -510,7 +553,8 @@ impl ParquetMetaDataReader {
             .get_read(file_size - 8)?
             .read_exact(&mut footer)?;
 
-        let metadata_len = Self::decode_footer(&footer)?;
+        let footer = Self::decode_footer_tail(&footer)?;
+        let metadata_len = footer.metadata_length();
         let footer_metadata_len = FOOTER_SIZE + metadata_len;
         self.metadata_size = Some(footer_metadata_len);
 
@@ -519,7 +563,10 @@ impl ParquetMetaDataReader {
         }
 
         let start = file_size - footer_metadata_len as u64;
-        Self::decode_metadata(chunk_reader.get_bytes(start, 
metadata_len)?.as_ref())
+        self.decode_footer_metadata(
+            chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
+            &footer,
+        )
     }
 
     /// Return the number of bytes to read in the initial pass. If 
`prefetch_size` has
@@ -537,6 +584,7 @@ impl ParquetMetaDataReader {
 
     #[cfg(all(feature = "async", feature = "arrow"))]
     async fn load_metadata<F: MetadataFetch>(
+        &self,
         fetch: &mut F,
         file_size: usize,
         prefetch: usize,
@@ -564,7 +612,8 @@ impl ParquetMetaDataReader {
         let mut footer = [0; FOOTER_SIZE];
         footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
 
-        let length = Self::decode_footer(&footer)?;
+        let footer = Self::decode_footer_tail(&footer)?;
+        let length = footer.metadata_length();
 
         if file_size < length + FOOTER_SIZE {
             return Err(eof_err!(
@@ -578,53 +627,188 @@ impl ParquetMetaDataReader {
         if length > suffix_len - FOOTER_SIZE {
             let metadata_start = file_size - length - FOOTER_SIZE;
             let meta = fetch.fetch(metadata_start..file_size - 
FOOTER_SIZE).await?;
-            Ok((Self::decode_metadata(&meta)?, None))
+            Ok((self.decode_footer_metadata(&meta, &footer)?, None))
         } else {
             let metadata_start = file_size - length - FOOTER_SIZE - 
footer_start;
             let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
             Ok((
-                Self::decode_metadata(slice)?,
+                self.decode_footer_metadata(slice, &footer)?,
                 Some((footer_start, suffix.slice(..metadata_start))),
             ))
         }
     }
 
-    /// Decodes the Parquet footer returning the metadata length in bytes
+    /// Decodes the end of the Parquet footer
     ///
-    /// A parquet footer is 8 bytes long and has the following layout:
+    /// There are 8 bytes at the end of the Parquet footer with the following 
layout:
     /// * 4 bytes for the metadata length
-    /// * 4 bytes for the magic bytes 'PAR1'
+    /// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer)
     ///
     /// ```text
-    /// +-----+--------+
-    /// | len | 'PAR1' |
-    /// +-----+--------+
+    /// +-----+------------------+
+    /// | len | 'PAR1' or 'PARE' |
+    /// +-----+------------------+
     /// ```
-    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
-        // check this is indeed a parquet file
-        if slice[4..] != PARQUET_MAGIC {
+    pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> 
{
+        let magic = &slice[4..];
+        let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
+            true
+        } else if magic == PARQUET_MAGIC {
+            false
+        } else {
             return Err(general_err!("Invalid Parquet file. Corrupt footer"));
-        }
-
+        };
         // get the metadata length from the footer
         let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
-        // u32 won't be larger than usize in most cases
-        Ok(metadata_len as usize)
+        Ok(FooterTail {
+            // u32 won't be larger than usize in most cases
+            metadata_length: metadata_len as usize,
+            encrypted_footer,
+        })
+    }
+
+    /// Decodes the Parquet footer, returning the metadata length in bytes
+    #[deprecated(note = "use decode_footer_tail instead")]
+    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
+        Self::decode_footer_tail(slice).map(|f| f.metadata_length)
     }
 
     /// Decodes [`ParquetMetaData`] from the provided bytes.
     ///
+    /// Typically, this is used to decode the metadata from the end of a 
parquet
+    /// file. The format of `buf` is the Thrift compact binary protocol, as 
specified
+    /// by the [Parquet Spec].
+    ///
+    /// This method handles using either `decode_metadata` or
+    /// `decode_metadata_with_encryption` depending on whether the encryption
+    /// feature is enabled.
+    ///
+    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
+    pub(crate) fn decode_footer_metadata(
+        &self,
+        buf: &[u8],
+        footer_tail: &FooterTail,
+    ) -> Result<ParquetMetaData> {
+        #[cfg(feature = "encryption")]
+        let result = Self::decode_metadata_with_encryption(
+            buf,
+            footer_tail.is_encrypted_footer(),
+            self.file_decryption_properties.as_ref(),
+        );
+        #[cfg(not(feature = "encryption"))]
+        let result = {
+            if footer_tail.is_encrypted_footer() {
+                Err(general_err!(
+                    "Parquet file has an encrypted footer but the encryption 
feature is disabled"
+                ))
+            } else {
+                Self::decode_metadata(buf)
+            }
+        };
+        result
+    }
+
+    /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata 
that may be encrypted.
+    ///
     /// Typically this is used to decode the metadata from the end of a parquet
-    /// file. The format of `buf` is the Thift compact binary protocol, as 
specified
+    /// file. The format of `buf` is the Thrift compact binary protocol, as 
specified
+    /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
+    /// ciphers as specfied in the [Parquet Encryption Spec].
+    ///
+    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
+    /// [Parquet Encryption Spec]: 
https://parquet.apache.org/docs/file-format/data-pages/encryption/
+    #[cfg(feature = "encryption")]
+    fn decode_metadata_with_encryption(
+        buf: &[u8],
+        encrypted_footer: bool,
+        file_decryption_properties: Option<&FileDecryptionProperties>,
+    ) -> Result<ParquetMetaData> {
+        let mut prot = TCompactSliceInputProtocol::new(buf);
+        let mut file_decryptor = None;
+        let decrypted_fmd_buf;
+
+        if encrypted_footer {
+            if let Some(file_decryption_properties) = 
file_decryption_properties {
+                let t_file_crypto_metadata: TFileCryptoMetaData =
+                    TFileCryptoMetaData::read_from_in_protocol(&mut prot)
+                        .map_err(|e| general_err!("Could not parse crypto 
metadata: {}", e))?;
+                let decryptor = get_file_decryptor(
+                    t_file_crypto_metadata.encryption_algorithm,
+                    file_decryption_properties,
+                )?;
+                let footer_decryptor = decryptor.get_footer_decryptor();
+                let aad_footer = create_footer_aad(decryptor.file_aad())?;
+
+                decrypted_fmd_buf = footer_decryptor?
+                    .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
+                    .map_err(|_| {
+                        general_err!(
+                            "Provided footer key and AAD were unable to 
decrypt parquet footer"
+                        )
+                    })?;
+                prot = 
TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
+
+                file_decryptor = Some(decryptor);
+            } else {
+                return Err(general_err!("Parquet file has an encrypted footer 
but no decryption properties were provided"));
+            }
+        }
+
+        let t_file_metadata: TFileMetaData = 
TFileMetaData::read_from_in_protocol(&mut prot)
+            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
+        let schema = types::from_thrift(&t_file_metadata.schema)?;
+        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
+
+        if let (Some(algo), Some(file_decryption_properties)) = (
+            t_file_metadata.encryption_algorithm,
+            file_decryption_properties,
+        ) {
+            // File has a plaintext footer but encryption algorithm is set
+            file_decryptor = Some(get_file_decryptor(algo, 
file_decryption_properties)?);
+        }
+
+        let mut row_groups = Vec::new();
+        for rg in t_file_metadata.row_groups {
+            let r = RowGroupMetaData::from_encrypted_thrift(
+                schema_descr.clone(),
+                rg,
+                file_decryptor.as_ref(),
+            )?;
+            row_groups.push(r);
+        }
+        let column_orders =
+            Self::parse_column_orders(t_file_metadata.column_orders, 
&schema_descr)?;
+
+        let file_metadata = FileMetaData::new(
+            t_file_metadata.version,
+            t_file_metadata.num_rows,
+            t_file_metadata.created_by,
+            t_file_metadata.key_value_metadata,
+            schema_descr,
+            column_orders,
+        );
+        let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
+
+        metadata.with_file_decryptor(file_decryptor);
+
+        Ok(metadata)
+    }
+
+    /// Decodes [`ParquetMetaData`] from the provided bytes.
+    ///
+    /// Typically this is used to decode the metadata from the end of a parquet
+    /// file. The format of `buf` is the Thrift compact binary protocol, as 
specified
     /// by the [Parquet Spec].
     ///
     /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
     pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
         let mut prot = TCompactSliceInputProtocol::new(buf);
+
         let t_file_metadata: TFileMetaData = 
TFileMetaData::read_from_in_protocol(&mut prot)
             .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
         let schema = types::from_thrift(&t_file_metadata.schema)?;
         let schema_descr = Arc::new(SchemaDescriptor::new(schema));
+
         let mut row_groups = Vec::new();
         for rg in t_file_metadata.row_groups {
             
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
@@ -640,6 +824,7 @@ impl ParquetMetaDataReader {
             schema_descr,
             column_orders,
         );
+
         Ok(ParquetMetaData::new(file_metadata, row_groups))
     }
 
@@ -675,6 +860,30 @@ impl ParquetMetaDataReader {
     }
 }
 
+#[cfg(feature = "encryption")]
+fn get_file_decryptor(
+    encryption_algorithm: EncryptionAlgorithm,
+    file_decryption_properties: &FileDecryptionProperties,
+) -> Result<FileDecryptor> {
+    match encryption_algorithm {
+        EncryptionAlgorithm::AESGCMV1(algo) => {
+            let aad_file_unique = algo
+                .aad_file_unique
+                .ok_or_else(|| general_err!("AAD unique file identifier is not 
set"))?;
+            let aad_prefix = if 
file_decryption_properties.aad_prefix.is_some() {
+                file_decryption_properties.aad_prefix.clone().unwrap()
+            } else {
+                algo.aad_prefix.unwrap_or_default()
+            };
+
+            FileDecryptor::new(file_decryption_properties, aad_file_unique, 
aad_prefix)
+        }
+        EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
+            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
+        )),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs
index 12ff35b516..b36ef752ae 100644
--- a/parquet/src/file/mod.rs
+++ b/parquet/src/file/mod.rs
@@ -110,3 +110,4 @@ pub mod writer;
 /// The length of the parquet footer in bytes
 pub const FOOTER_SIZE: usize = 8;
 const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];
+const PARQUET_MAGIC_ENCR_FOOTER: [u8; 4] = [b'P', b'A', b'R', b'E'];
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 81ba0a6646..2673f4ac52 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -18,14 +18,12 @@
 //! Contains implementations of the reader traits FileReader, RowGroupReader 
and PageReader
 //! Also contains implementations of the ChunkReader for files (with 
buffering) and byte arrays (RAM)
 
-use std::collections::VecDeque;
-use std::iter;
-use std::{fs::File, io::Read, path::Path, sync::Arc};
-
 use crate::basic::{Encoding, Type};
 use crate::bloom_filter::Sbbf;
 use crate::column::page::{Page, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
+#[cfg(feature = "encryption")]
+use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
 use crate::errors::{ParquetError, Result};
 use crate::file::page_index::offset_index::OffsetIndexMetaData;
 use crate::file::{
@@ -40,6 +38,9 @@ use crate::record::Row;
 use crate::schema::types::Type as SchemaType;
 use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
 use bytes::Bytes;
+use std::collections::VecDeque;
+use std::iter;
+use std::{fs::File, io::Read, path::Path, sync::Arc};
 use thrift::protocol::TCompactInputProtocol;
 
 impl TryFrom<File> for SerializedFileReader<File> {
@@ -340,11 +341,53 @@ impl<R: 'static + ChunkReader> RowGroupReader for 
SerializedRowGroupReader<'_, R
 /// Reads a [`PageHeader`] from the provided [`Read`]
 pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
     let mut prot = TCompactInputProtocol::new(input);
-    let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
-    Ok(page_header)
+    Ok(PageHeader::read_from_in_protocol(&mut prot)?)
+}
+
+#[cfg(feature = "encryption")]
+pub(crate) fn read_encrypted_page_header<T: Read>(
+    input: &mut T,
+    crypto_context: Arc<CryptoContext>,
+) -> Result<PageHeader> {
+    let data_decryptor = crypto_context.data_decryptor();
+    let aad = crypto_context.create_page_header_aad()?;
+
+    let buf = read_and_decrypt(data_decryptor, input, aad.as_ref())?;
+
+    let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
+    Ok(PageHeader::read_from_in_protocol(&mut prot)?)
+}
+
+/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of 
bytes read.
+/// If the page header is encrypted [`CryptoContext`] must be provided.
+#[cfg(feature = "encryption")]
+fn read_encrypted_page_header_len<T: Read>(
+    input: &mut T,
+    crypto_context: Option<Arc<CryptoContext>>,
+) -> Result<(usize, PageHeader)> {
+    /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
+    struct TrackedRead<R> {
+        inner: R,
+        bytes_read: usize,
+    }
+
+    impl<R: Read> Read for TrackedRead<R> {
+        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+            let v = self.inner.read(buf)?;
+            self.bytes_read += v;
+            Ok(v)
+        }
+    }
+
+    let mut tracked = TrackedRead {
+        inner: input,
+        bytes_read: 0,
+    };
+    let header = read_encrypted_page_header(&mut tracked, 
crypto_context.unwrap())?;
+    Ok((tracked.bytes_read, header))
 }
 
-/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of 
bytes read
+/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of 
bytes read.
 fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> 
{
     /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
     struct TrackedRead<R> {
@@ -489,6 +532,12 @@ enum SerializedPageReaderState {
 
         // If the next page header has already been "peeked", we will cache it 
and it`s length here
         next_page_header: Option<Box<PageHeader>>,
+
+        /// The index of the data page within this column chunk
+        page_ordinal: usize,
+
+        /// Whether the next page is expected to be a dictionary page
+        require_dictionary: bool,
     },
     Pages {
         /// Remaining page locations
@@ -512,6 +561,10 @@ pub struct SerializedPageReader<R: ChunkReader> {
     physical_type: Type,
 
     state: SerializedPageReaderState,
+
+    /// Crypto context carrying objects required for decryption
+    #[cfg(feature = "encryption")]
+    crypto_context: Option<Arc<CryptoContext>>,
 }
 
 impl<R: ChunkReader> SerializedPageReader<R> {
@@ -526,6 +579,16 @@ impl<R: ChunkReader> SerializedPageReader<R> {
         SerializedPageReader::new_with_properties(reader, meta, total_rows, 
page_locations, props)
     }
 
+    /// Adds cryptographical information to the reader.
+    #[cfg(feature = "encryption")]
+    pub(crate) fn with_crypto_context(
+        mut self,
+        crypto_context: Option<Arc<CryptoContext>>,
+    ) -> Self {
+        self.crypto_context = crypto_context;
+        self
+    }
+
     /// Creates a new serialized page with custom options.
     pub fn new_with_properties(
         reader: Arc<R>,
@@ -558,14 +621,17 @@ impl<R: ChunkReader> SerializedPageReader<R> {
                 offset: start as usize,
                 remaining_bytes: len as usize,
                 next_page_header: None,
+                page_ordinal: 0,
+                require_dictionary: meta.dictionary_page_offset().is_some(),
             },
         };
-
         Ok(Self {
             reader,
             decompressor,
             state,
             physical_type: meta.column_type(),
+            #[cfg(feature = "encryption")]
+            crypto_context: None,
         })
     }
 
@@ -581,6 +647,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
                 offset,
                 remaining_bytes,
                 next_page_header,
+                ..
             } => {
                 loop {
                     if *remaining_bytes == 0 {
@@ -664,6 +731,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> 
{
                     offset,
                     remaining_bytes: remaining,
                     next_page_header,
+                    page_ordinal,
+                    require_dictionary,
                 } => {
                     if *remaining == 0 {
                         return Ok(None);
@@ -673,7 +742,21 @@ impl<R: ChunkReader> PageReader for 
SerializedPageReader<R> {
                     let header = if let Some(header) = next_page_header.take() 
{
                         *header
                     } else {
+                        #[cfg(feature = "encryption")]
+                        let (header_len, header) = if 
self.crypto_context.is_some() {
+                            let crypto_context = page_crypto_context(
+                                &self.crypto_context,
+                                *page_ordinal,
+                                *require_dictionary,
+                            )?;
+                            read_encrypted_page_header_len(&mut read, 
crypto_context)?
+                        } else {
+                            read_page_header_len(&mut read)?
+                        };
+
+                        #[cfg(not(feature = "encryption"))]
                         let (header_len, header) = read_page_header_len(&mut 
read)?;
+
                         verify_page_header_len(header_len, *remaining)?;
                         *offset += header_len;
                         *remaining -= header_len;
@@ -703,12 +786,33 @@ impl<R: ChunkReader> PageReader for 
SerializedPageReader<R> {
                         ));
                     }
 
-                    decode_page(
+                    #[cfg(feature = "encryption")]
+                    let crypto_context = page_crypto_context(
+                        &self.crypto_context,
+                        *page_ordinal,
+                        *require_dictionary,
+                    )?;
+                    #[cfg(feature = "encryption")]
+                    let buffer: Vec<u8> = if let Some(crypto_context) = 
crypto_context {
+                        let decryptor = crypto_context.data_decryptor();
+                        let aad = crypto_context.create_page_aad()?;
+                        decryptor.decrypt(buffer.as_ref(), &aad)?
+                    } else {
+                        buffer
+                    };
+
+                    let page = decode_page(
                         header,
                         Bytes::from(buffer),
                         self.physical_type,
                         self.decompressor.as_mut(),
-                    )?
+                    )?;
+                    if page.is_data_page() {
+                        *page_ordinal += 1;
+                    } else if page.is_dictionary_page() {
+                        *require_dictionary = false;
+                    }
+                    page
                 }
                 SerializedPageReaderState::Pages {
                     page_locations,
@@ -751,6 +855,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> 
{
                 offset,
                 remaining_bytes,
                 next_page_header,
+                ..
             } => {
                 loop {
                     if *remaining_bytes == 0 {
@@ -816,6 +921,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> 
{
                 offset,
                 remaining_bytes,
                 next_page_header,
+                ..
             } => {
                 if let Some(buffered_header) = next_page_header.take() {
                     verify_page_size(
@@ -857,6 +963,21 @@ impl<R: ChunkReader> PageReader for 
SerializedPageReader<R> {
     }
 }
 
+#[cfg(feature = "encryption")]
+fn page_crypto_context(
+    crypto_context: &Option<Arc<CryptoContext>>,
+    page_ordinal: usize,
+    dictionary_page: bool,
+) -> Result<Option<Arc<CryptoContext>>> {
+    Ok(crypto_context.as_ref().map(|c| {
+        Arc::new(if dictionary_page {
+            c.for_dictionary_page()
+        } else {
+            c.with_page_ordinal(page_ordinal)
+        })
+    }))
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::HashSet;
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index bd86ca3636..86c88ff954 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -145,6 +145,10 @@ pub mod column;
 experimental!(mod compression);
 experimental!(mod encodings);
 pub mod bloom_filter;
+
+#[cfg(feature = "encryption")]
+experimental!(pub mod encryption);
+
 pub mod file;
 pub mod record;
 pub mod schema;
diff --git a/parquet/src/util/test_common/encryption_util.rs 
b/parquet/src/util/test_common/encryption_util.rs
new file mode 100644
index 0000000000..2f6e5bc456
--- /dev/null
+++ b/parquet/src/util/test_common/encryption_util.rs
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::arrow_reader::{
+    ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
+};
+use crate::arrow::ParquetRecordBatchStreamBuilder;
+use crate::encryption::decrypt::FileDecryptionProperties;
+use crate::errors::ParquetError;
+use crate::file::metadata::FileMetaData;
+use arrow_array::cast::AsArray;
+use arrow_array::{types, RecordBatch};
+use futures::TryStreamExt;
+use std::fs::File;
+
+pub(crate) fn verify_encryption_test_file_read(
+    file: File,
+    decryption_properties: FileDecryptionProperties,
+) {
+    let options = ArrowReaderOptions::default()
+        .with_file_decryption_properties(decryption_properties.clone());
+    let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
+    let file_metadata = metadata.metadata.file_metadata();
+
+    let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
options).unwrap();
+    let record_reader = builder.build().unwrap();
+    let record_batches = record_reader
+        .map(|x| x.unwrap())
+        .collect::<Vec<RecordBatch>>();
+
+    verify_encryption_test_data(record_batches, file_metadata.clone(), 
metadata);
+}
+
+pub(crate) async fn verify_encryption_test_file_read_async(
+    file: &mut tokio::fs::File,
+    decryption_properties: FileDecryptionProperties,
+) -> Result<(), ParquetError> {
+    let options = 
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+
+    let metadata = ArrowReaderMetadata::load_async(file, 
options.clone()).await?;
+    let arrow_reader_metadata = ArrowReaderMetadata::load_async(file, 
options).await?;
+    let file_metadata = metadata.metadata.file_metadata();
+
+    let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata(
+        file.try_clone().await?,
+        arrow_reader_metadata.clone(),
+    )
+    .build()?;
+    let record_batches = record_reader.try_collect::<Vec<_>>().await?;
+
+    verify_encryption_test_data(record_batches, file_metadata.clone(), 
metadata);
+    Ok(())
+}
+
+/// Tests reading an encrypted file from the parquet-testing repository
+fn verify_encryption_test_data(
+    record_batches: Vec<RecordBatch>,
+    file_metadata: FileMetaData,
+    metadata: ArrowReaderMetadata,
+) {
+    assert_eq!(file_metadata.num_rows(), 50);
+    assert_eq!(file_metadata.schema_descr().num_columns(), 8);
+
+    metadata.metadata.row_groups().iter().for_each(|rg| {
+        assert_eq!(rg.num_columns(), 8);
+        assert_eq!(rg.num_rows(), 50);
+    });
+
+    let mut row_count = 0;
+    for batch in record_batches {
+        let batch = batch;
+        row_count += batch.num_rows();
+
+        let bool_col = batch.column(0).as_boolean();
+        let time_col = batch
+            .column(1)
+            .as_primitive::<types::Time32MillisecondType>();
+        let list_col = batch.column(2).as_list::<i32>();
+        let timestamp_col = batch
+            .column(3)
+            .as_primitive::<types::TimestampNanosecondType>();
+        let f32_col = batch.column(4).as_primitive::<types::Float32Type>();
+        let f64_col = batch.column(5).as_primitive::<types::Float64Type>();
+        let binary_col = batch.column(6).as_binary::<i32>();
+        let fixed_size_binary_col = batch.column(7).as_fixed_size_binary();
+
+        for (i, x) in bool_col.iter().enumerate() {
+            assert_eq!(x.unwrap(), i % 2 == 0);
+        }
+        for (i, x) in time_col.iter().enumerate() {
+            assert_eq!(x.unwrap(), i as i32);
+        }
+        for (i, list_item) in list_col.iter().enumerate() {
+            let list_item = list_item.unwrap();
+            let list_item = list_item.as_primitive::<types::Int64Type>();
+            assert_eq!(list_item.len(), 2);
+            assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64);
+            assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as 
i64);
+        }
+        for x in timestamp_col.iter() {
+            assert!(x.is_some());
+        }
+        for (i, x) in f32_col.iter().enumerate() {
+            assert_eq!(x.unwrap(), i as f32 * 1.1f32);
+        }
+        for (i, x) in f64_col.iter().enumerate() {
+            assert_eq!(x.unwrap(), i as f64 * 1.1111111f64);
+        }
+        for (i, x) in binary_col.iter().enumerate() {
+            assert_eq!(x.is_some(), i % 2 == 0);
+            if let Some(x) = x {
+                assert_eq!(&x[0..7], b"parquet");
+            }
+        }
+        for (i, x) in fixed_size_binary_col.iter().enumerate() {
+            assert_eq!(x.unwrap(), &[i as u8; 10]);
+        }
+    }
+
+    assert_eq!(row_count, file_metadata.num_rows() as usize);
+}
diff --git a/parquet/src/util/test_common/mod.rs 
b/parquet/src/util/test_common/mod.rs
index 8cfc1e6dd4..ac36118c37 100644
--- a/parquet/src/util/test_common/mod.rs
+++ b/parquet/src/util/test_common/mod.rs
@@ -22,3 +22,6 @@ pub mod file_util;
 
 #[cfg(test)]
 pub mod rand_gen;
+
+#[cfg(all(test, feature = "encryption", feature = "arrow"))]
+pub mod encryption_util;

Reply via email to