This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new accb1cf12a [parquet] Allow more encryption algorithms (#9203)
accb1cf12a is described below

commit accb1cf12a33878f179f8bb3c78c1a740fb97216
Author: hsiang-c <[email protected]>
AuthorDate: Wed May 20 10:51:53 2026 -0700

    [parquet] Allow more encryption algorithms (#9203)
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    - Closes #9202.
    
    # Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    - Iceberg
    [spec](https://iceberg.apache.org/gcm-stream-spec/#encryption-algorithm)
    supports AES key sizes of 128, 192 and 256 bits. Iceberg Rust depends on
    `arrow-rs` for Parquet I/O, I'd like to start supporting AES 256 with
    this PR.
    
    # What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    - `RingGcmBlockEncryptor` and `RingGcmBlockDecryptor` will pick AES-128
    or AES-256 based on key size
    - Refactor `encryption_async.rs` and `encryption.rs` to test both
    AES-128 and AES-256 encrypted parquet files
    
    
    # Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    Yes, unit test and on AES-256 encrypted Parquet files defined in
    https://github.com/apache/parquet-testing/tree/master/data/aes256
    
    # Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    
    If there are any breaking changes to public APIs, please call them out.
    -->
    
    No
---
 parquet-testing                              |   2 +-
 parquet/src/encryption/ciphers.rs            |  35 +-
 parquet/tests/encryption/encryption.rs       | 892 +++++++++++++++++----------
 parquet/tests/encryption/encryption_async.rs | 542 ++++++++++------
 parquet/tests/encryption/encryption_util.rs  |  80 +++
 5 files changed, 1015 insertions(+), 536 deletions(-)

diff --git a/parquet-testing b/parquet-testing
index a3d96a65e1..5a6cf84678 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit a3d96a65e11e2bbca7d22a894e8313ede90a33a3
+Subproject commit 5a6cf84678df3af65c231109b78b86ba9bf495df
diff --git a/parquet/src/encryption/ciphers.rs 
b/parquet/src/encryption/ciphers.rs
index a94c72dcd5..beb3acda26 100644
--- a/parquet/src/encryption/ciphers.rs
+++ b/parquet/src/encryption/ciphers.rs
@@ -19,7 +19,7 @@ use crate::errors::ParquetError;
 use crate::errors::ParquetError::General;
 use crate::errors::Result;
 use crate::file::metadata::HeapSize;
-use ring::aead::{AES_128_GCM, Aad, LessSafeKey, NonceSequence, UnboundKey};
+use ring::aead::{AES_128_GCM, AES_256_GCM, Aad, LessSafeKey, NonceSequence, 
UnboundKey};
 use ring::rand::{SecureRandom, SystemRandom};
 use std::fmt::Debug;
 
@@ -40,10 +40,20 @@ pub(crate) struct RingGcmBlockDecryptor {
 }
 
 impl RingGcmBlockDecryptor {
+    /// Create a new `RingGcmBlockDecryptor` with a given key.
     pub(crate) fn new(key_bytes: &[u8]) -> Result<Self> {
-        // todo support other key sizes
-        let key = UnboundKey::new(&AES_128_GCM, key_bytes)
-            .map_err(|_| General("Failed to create AES key".to_string()))?;
+        let algorithm = if key_bytes.len() == AES_128_GCM.key_len() {
+            &AES_128_GCM
+        } else if key_bytes.len() == AES_256_GCM.key_len() {
+            &AES_256_GCM
+        } else {
+            return Err(general_err!(
+                "Error creating RingGcmBlockDecryptor with unsupported key 
length: {}",
+                key_bytes.len()
+            ));
+        };
+        let key = UnboundKey::new(algorithm, key_bytes)
+            .map_err(|_| general_err!("Failed to create {:?} key", 
algorithm))?;
 
         Ok(Self {
             key: LessSafeKey::new(key),
@@ -144,10 +154,19 @@ impl RingGcmBlockEncryptor {
     /// return an error if it wraps around.
     pub(crate) fn new(key_bytes: &[u8]) -> Result<Self> {
         let rng = SystemRandom::new();
-
-        // todo support other key sizes
-        let key = UnboundKey::new(&AES_128_GCM, key_bytes)
-            .map_err(|e| general_err!("Error creating AES key: {}", e))?;
+        let algorithm = if key_bytes.len() == AES_128_GCM.key_len() {
+            &AES_128_GCM
+        } else if key_bytes.len() == AES_256_GCM.key_len() {
+            &AES_256_GCM
+        } else {
+            return Err(general_err!(
+                "Error creating RingGcmBlockEncryptor with unsupported key 
length: {}",
+                key_bytes.len()
+            ));
+        };
+
+        let key = UnboundKey::new(algorithm, key_bytes)
+            .map_err(|e| general_err!("Error creating {:?} key: {}", 
algorithm, e))?;
         let nonce = CounterNonce::new(&rng)?;
 
         Ok(Self {
diff --git a/parquet/tests/encryption/encryption.rs 
b/parquet/tests/encryption/encryption.rs
index c26a0edee6..edd26f2961 100644
--- a/parquet/tests/encryption/encryption.rs
+++ b/parquet/tests/encryption/encryption.rs
@@ -17,7 +17,12 @@
 
 //! This module contains tests for reading encrypted Parquet files with the 
Arrow API
 
+use crate::encryption_util;
 use crate::encryption_util::{
+    AES_128_COLUMN_KEYS, AES_128_COLUMN_NAME_KEYS, AES_128_COLUMN_NAMES, 
AES_128_FOOTER_KEY,
+    AES_128_FOOTER_KEY_NAME, AES_128_KEY_NAME_KEY, AES_128_KEY_NAMES, 
AES_256_COLUMN_KEYS,
+    AES_256_COLUMN_NAME_KEYS, AES_256_COLUMN_NAMES, AES_256_FOOTER_KEY, 
AES_256_FOOTER_KEY_NAME,
+    AES_256_KEY_NAME_KEY, AES_256_KEY_NAMES, BAD_AES_128_FOOTER_KEY, 
BAD_AES_256_FOOTER_KEY,
     TestKeyRetriever, read_and_roundtrip_to_encrypted_file, 
verify_column_indexes,
     verify_encryption_test_file_read,
 };
@@ -43,120 +48,131 @@ use std::sync::Arc;
 
 #[test]
 fn test_non_uniform_encryption_plaintext_footer() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn non_uniform_encryption_plaintext_footer(footer_key: &[u8], column_keys: 
&[(&str, &[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_plaintext_footer.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
 
-    // There is always a footer key even with a plaintext footer,
+    // AES-128: there is always a footer key even with a plaintext footer,
     // but this is used for signing the footer.
-    let footer_key = "0123456789012345".as_bytes(); // 128bit/16
-    let column_1_key = "1234567890123450".as_bytes();
-    let column_2_key = "1234567890123451".as_bytes();
-
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
-        .with_column_key("double_field", column_1_key.to_vec())
-        .with_column_key("float_field", column_2_key.to_vec())
-        .build()
-        .unwrap();
+    non_uniform_encryption_plaintext_footer(AES_128_FOOTER_KEY, 
AES_128_COLUMN_NAME_KEYS);
 
-    verify_encryption_test_file_read(file, decryption_properties);
+    // AES-256
+    non_uniform_encryption_plaintext_footer(AES_256_FOOTER_KEY, 
AES_256_COLUMN_NAME_KEYS);
 }
 
 #[test]
 fn test_plaintext_footer_signature_verification() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
-    let file = File::open(path.clone()).unwrap();
-
-    let footer_key = "0000000000000000".as_bytes(); // 128bit/16
-    let column_1_key = "1234567890123450".as_bytes();
-    let column_2_key = "1234567890123451".as_bytes();
-
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
-        .disable_footer_signature_verification()
-        .with_column_key("double_field", column_1_key.to_vec())
-        .with_column_key("float_field", column_2_key.to_vec())
-        .build()
-        .unwrap();
-
-    verify_encryption_test_file_read(file, decryption_properties);
+    fn plaintext_footer_signature_verification(footer_key: &[u8], column_keys: 
&[(&str, &[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_plaintext_footer.parquet.encrypted",
+        );
+        let file = File::open(path.clone()).unwrap();
+        let mut disable_footer_signature_verification_builder =
+            FileDecryptionProperties::builder(footer_key.to_vec())
+                .disable_footer_signature_verification();
+        for (column_name, key) in column_keys {
+            disable_footer_signature_verification_builder =
+                disable_footer_signature_verification_builder
+                    .with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = 
disable_footer_signature_verification_builder
+            .build()
+            .unwrap();
+        verify_encryption_test_file_read(file, decryption_properties);
 
-    let file = File::open(path.clone()).unwrap();
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
 
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
-        .with_column_key("double_field", column_1_key.to_vec())
-        .with_column_key("float_field", column_2_key.to_vec())
-        .build()
-        .unwrap();
+        let file = File::open(path.clone()).unwrap();
+        let decryption_properties = builder.build().unwrap();
+        let options =
+            
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+        let result = ArrowReaderMetadata::load(&file, options.clone());
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .starts_with("Parquet error: Footer signature verification 
failed. Computed: [")
+        );
+    }
 
-    let options =
-        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
-    let result = ArrowReaderMetadata::load(&file, options.clone());
-    assert!(result.is_err());
-    assert!(
-        result
-            .unwrap_err()
-            .to_string()
-            .starts_with("Parquet error: Footer signature verification failed. 
Computed: [")
-    );
+    plaintext_footer_signature_verification(BAD_AES_128_FOOTER_KEY, 
AES_128_COLUMN_NAME_KEYS);
+    plaintext_footer_signature_verification(BAD_AES_256_FOOTER_KEY, 
AES_256_COLUMN_NAME_KEYS)
 }
 
 #[test]
 fn test_non_uniform_encryption_disabled_aad_storage() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path =
-        
format!("{test_data}/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted");
-    let file = File::open(path.clone()).unwrap();
-
-    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
-    let column_1_key = b"1234567890123450".to_vec();
-    let column_2_key = b"1234567890123451".to_vec();
-
-    // Can read successfully when providing the correct AAD prefix
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
-        .with_column_key("double_field", column_1_key.clone())
-        .with_column_key("float_field", column_2_key.clone())
-        .with_aad_prefix(b"tester".to_vec())
-        .build()
-        .unwrap();
-
-    verify_encryption_test_file_read(file, decryption_properties);
-
-    // Using wrong AAD prefix should fail
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
-        .with_column_key("double_field", column_1_key.clone())
-        .with_column_key("float_field", column_2_key.clone())
-        .with_aad_prefix(b"wrong_aad_prefix".to_vec())
-        .build()
-        .unwrap();
+    fn non_uniform_encryption_disabled_aad_storage(
+        footer_key: &[u8],
+        column_keys: &[(&str, &[u8])],
+    ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted",
+        );
 
-    let file = File::open(path.clone()).unwrap();
-    let options = ArrowReaderOptions::default()
-        .with_file_decryption_properties(decryption_properties.clone());
-    let result = ArrowReaderMetadata::load(&file, options.clone());
-    assert!(result.is_err());
-    assert_eq!(
-        result.unwrap_err().to_string(),
-        "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer"
-    );
+        // Can read successfully when providing the correct AAD prefix
+        let file = File::open(path.clone()).unwrap();
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_aad_prefix(b"tester".to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
+        verify_encryption_test_file_read(file, decryption_properties);
+
+        // Using wrong AAD prefix should fail
+        let file = File::open(path.clone()).unwrap();
+        let mut wrong_aad_builder = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_aad_prefix(b"wrong_aad_prefix".to_vec());
+        for (column_name, key) in column_keys {
+            wrong_aad_builder = wrong_aad_builder.with_column_key(column_name, 
key.to_vec());
+        }
+        let decryption_properties = wrong_aad_builder.build().unwrap();
+        let options =
+            
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+        let result = ArrowReaderMetadata::load(&file, options.clone());
+        assert!(result.is_err());
+        std::assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer"
+        );
 
-    // Not providing any AAD prefix should fail as it isn't stored in the file
-    let decryption_properties = FileDecryptionProperties::builder(footer_key)
-        .with_column_key("double_field", column_1_key)
-        .with_column_key("float_field", column_2_key)
-        .build()
-        .unwrap();
+        // Not providing any AAD prefix should fail as it isn't stored in the 
file
+        let mut no_aad_builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            no_aad_builder = no_aad_builder.with_column_key(column_name, 
key.to_vec());
+        }
+        let decryption_properties = no_aad_builder.build().unwrap();
 
-    let file = File::open(path).unwrap();
-    let options =
-        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
-    let result = ArrowReaderMetadata::load(&file, options.clone());
-    assert!(result.is_err());
-    assert_eq!(
-        result.unwrap_err().to_string(),
-        "Parquet error: Parquet file was encrypted with an AAD prefix that is 
not stored in the file, \
+        let file = File::open(path).unwrap();
+        let options =
+            
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+        let result = ArrowReaderMetadata::load(&file, options.clone());
+        assert!(result.is_err());
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: Parquet file was encrypted with an AAD prefix that 
is not stored in the file, \
         but no AAD prefix was provided in the file decryption properties"
-    );
+        );
+    }
+
+    non_uniform_encryption_disabled_aad_storage(AES_128_FOOTER_KEY, 
AES_128_COLUMN_NAME_KEYS);
+    non_uniform_encryption_disabled_aad_storage(AES_256_FOOTER_KEY, 
AES_256_COLUMN_NAME_KEYS);
 }
 
 #[test]
@@ -167,33 +183,44 @@ fn test_plaintext_footer_read_without_decryption() {
 
 #[test]
 fn test_non_uniform_encryption() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn non_uniform_encryption(footer_key: &[u8], column_keys: &[(&str, 
&[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
-    let column_1_key = b"1234567890123450".to_vec();
-    let column_2_key = b"1234567890123451".to_vec();
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
 
-    let decryption_properties = FileDecryptionProperties::builder(footer_key)
-        .with_column_key("double_field", column_1_key)
-        .with_column_key("float_field", column_2_key)
-        .build()
-        .unwrap();
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
 
-    verify_encryption_test_file_read(file, decryption_properties);
+    non_uniform_encryption(AES_128_FOOTER_KEY, AES_128_COLUMN_NAME_KEYS);
+    non_uniform_encryption(AES_256_FOOTER_KEY, AES_256_COLUMN_NAME_KEYS);
 }
 
 #[test]
 fn test_uniform_encryption() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn uniform_encryption(footer_key: &[u8]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "uniform_encryption.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    let key_code = b"0123456789012345".to_vec();
-    let decryption_properties = 
FileDecryptionProperties::builder(key_code).build().unwrap();
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .build()
+            .unwrap();
+
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
 
-    verify_encryption_test_file_read(file, decryption_properties);
+    uniform_encryption(AES_128_FOOTER_KEY);
+    uniform_encryption(AES_256_FOOTER_KEY);
 }
 
 #[test]
@@ -213,173 +240,298 @@ fn 
test_decrypting_without_decryption_properties_fails() {
 
 #[test]
 fn test_aes_ctr_encryption() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_and_footer_ctr.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn aes_ctr_encryption(footer_key: &[u8], column_keys: &[(&str, &[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer_ctr.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    let footer_key = b"0123456789012345".to_vec();
-    let column_1_key = b"1234567890123450".to_vec();
-    let column_2_key = b"1234567890123451".to_vec();
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
 
-    let decryption_properties = FileDecryptionProperties::builder(footer_key)
-        .with_column_key("double_field", column_1_key)
-        .with_column_key("float_field", column_2_key)
-        .build()
-        .unwrap();
+        let options =
+            
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+        let metadata = ArrowReaderMetadata::load(&file, options);
 
-    let options =
-        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
-    let metadata = ArrowReaderMetadata::load(&file, options);
+        match metadata {
+            Err(ParquetError::NYI(s)) => {
+                assert!(s.contains("AES_GCM_CTR_V1"));
+            }
+            _ => {
+                panic!("Expected ParquetError::NYI");
+            }
+        };
+    }
 
-    match metadata {
-        Err(parquet::errors::ParquetError::NYI(s)) => {
-            assert!(s.contains("AES_GCM_CTR_V1"));
-        }
-        _ => {
-            panic!("Expected ParquetError::NYI");
-        }
-    };
+    aes_ctr_encryption(AES_128_FOOTER_KEY, AES_128_COLUMN_NAME_KEYS);
+    aes_ctr_encryption(AES_256_FOOTER_KEY, AES_256_COLUMN_NAME_KEYS);
 }
 
 #[test]
 fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn non_uniform_encryption_plaintext_footer_with_key_retriever(
+        footer_key: &[u8],
+        keys: &[(&str, &[u8])],
+    ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_plaintext_footer.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    let key_retriever = TestKeyRetriever::new()
-        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
-        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
-        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+        let mut key_retriever = TestKeyRetriever::new();
+        for (key_name, key) in keys {
+            key_retriever = key_retriever.with_key((*key_name).to_owned(), 
(*key).to_vec());
+        }
 
-    let decryption_properties =
-        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
-            .build()
-            .unwrap();
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+                .build()
+                .unwrap();
 
-    verify_encryption_test_file_read(file, decryption_properties);
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
+
+    non_uniform_encryption_plaintext_footer_with_key_retriever(
+        AES_128_FOOTER_KEY,
+        AES_128_KEY_NAME_KEY,
+    );
+
+    non_uniform_encryption_plaintext_footer_with_key_retriever(
+        AES_256_FOOTER_KEY,
+        AES_256_KEY_NAME_KEY,
+    );
 }
 
 #[test]
-fn test_uniform_encryption_plaintext_footer_with_key_retriever() {
-    let test_data = arrow::util::test_util::parquet_test_data();
+fn test_roundtrip_non_uniform_encryption_plaintext_footer_with_key_retriever() 
{
+    fn roundtrip_non_uniform_encryption_plaintext_footer_with_key_retriever(
+        footer_key: &[u8],
+        footer_key_metadata: &str,
+        wrong_footer_key: &[u8],
+        dec_column_keys: &[(&str, &[u8])],
+        enc_column_keys: &[(&str, &[u8], &str)],
+    ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_plaintext_footer.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    // Read example data with key retriever
-    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
-    let file = File::open(path).unwrap();
+        let mut key_retriever =
+            TestKeyRetriever::new().with_key(footer_key_metadata.to_owned(), 
footer_key.to_vec());
 
-    let key_retriever = Arc::new(
-        TestKeyRetriever::new()
-            .with_key("kf".to_owned(), b"0123456789012345".to_vec())
-            .with_key("kc1".to_owned(), b"1234567890123450".to_vec())
-            .with_key("kc2".to_owned(), b"1234567890123451".to_vec()),
-    );
+        for (key_name, key) in dec_column_keys {
+            key_retriever = key_retriever.with_key((*key_name).to_owned(), 
(*key).to_vec());
+        }
 
-    let decryption_properties = 
FileDecryptionProperties::with_key_retriever(key_retriever.clone())
-        .build()
-        .unwrap();
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+                .build()
+                .unwrap();
 
-    let options =
-        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
-    let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
+        let options = ArrowReaderOptions::default()
+            .with_file_decryption_properties(decryption_properties.clone());
+        let metadata = ArrowReaderMetadata::load(&file, 
options.clone()).unwrap();
+
+        // Write data into temporary file with plaintext footer and footer key 
metadata
+        let temp_file = tempfile::tempfile().unwrap();
+        let mut encryption_properties_builder =
+            FileEncryptionProperties::builder(footer_key.to_vec())
+                .with_footer_key_metadata(footer_key_metadata.into());
+        for (column_name, key, metadata) in enc_column_keys {
+            encryption_properties_builder = encryption_properties_builder
+                .with_column_key_and_metadata(column_name, key.to_vec(), 
(*metadata).into());
+        }
+        let encryption_properties = encryption_properties_builder
+            .with_plaintext_footer(true)
+            .build()
+            .unwrap();
 
-    // Write data into temporary file with plaintext footer and footer key 
metadata
-    let temp_file = tempfile::tempfile().unwrap();
-    let encryption_properties = 
FileEncryptionProperties::builder(b"0123456789012345".to_vec())
-        .with_footer_key_metadata("kf".into())
-        .with_column_key_and_metadata("double_field", 
b"1234567890123450".to_vec(), b"kc1".into())
-        .with_column_key_and_metadata("float_field", 
b"1234567890123451".to_vec(), b"kc2".into())
-        .with_plaintext_footer(true)
-        .build()
-        .unwrap();
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
+        let batch_reader = builder.build().unwrap();
+        let batches = batch_reader
+            .collect::<parquet::errors::Result<Vec<RecordBatch>, _>>()
+            .unwrap();
 
-    let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
options).unwrap();
-    let batch_reader = builder.build().unwrap();
-    let batches = batch_reader
-        .collect::<parquet::errors::Result<Vec<RecordBatch>, _>>()
+        let props = WriterProperties::builder()
+            .with_file_encryption_properties(encryption_properties)
+            .build();
+
+        let mut writer = ArrowWriter::try_new(
+            temp_file.try_clone().unwrap(),
+            metadata.schema().clone(),
+            Some(props),
+        )
         .unwrap();
+        for batch in batches {
+            writer.write(&batch).unwrap();
+        }
 
-    let props = WriterProperties::builder()
-        .with_file_encryption_properties(encryption_properties)
-        .build();
+        writer.close().unwrap();
 
-    let mut writer = ArrowWriter::try_new(
-        temp_file.try_clone().unwrap(),
-        metadata.schema().clone(),
-        Some(props),
-    )
-    .unwrap();
-    for batch in batches {
-        writer.write(&batch).unwrap();
-    }
+        // Read temporary file with plaintext metadata using key retriever
+        let options = ArrowReaderOptions::default()
+            .with_file_decryption_properties(decryption_properties.clone());
+        let _ = ArrowReaderMetadata::load(&temp_file, 
options.clone()).unwrap();
 
-    writer.close().unwrap();
+        // Read temporary file with plaintext metadata using key retriever 
with invalid key
+        let mut key_retriever = TestKeyRetriever::new()
+            .with_key(footer_key_metadata.to_owned(), 
wrong_footer_key.to_vec());
 
-    // Read temporary file with plaintext metadata using key retriever
-    let decryption_properties = 
FileDecryptionProperties::with_key_retriever(key_retriever)
-        .build()
-        .unwrap();
+        for (key_name, key) in dec_column_keys {
+            key_retriever = key_retriever.with_key((*key_name).to_owned(), 
(*key).to_vec());
+        }
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+                .build()
+                .unwrap();
+        let options =
+            
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+        let result = ArrowReaderMetadata::load(&temp_file, options.clone());
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .starts_with("Parquet error: Footer signature verification 
failed. Computed: [")
+        );
+    }
 
-    let options =
-        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
-    let _ = ArrowReaderMetadata::load(&temp_file, options.clone()).unwrap();
-
-    // Read temporary file with plaintext metadata using key retriever with 
invalid key
-    let key_retriever = Arc::new(
-        TestKeyRetriever::new()
-            .with_key("kf".to_owned(), b"0133756789012345".to_vec())
-            .with_key("kc1".to_owned(), b"1234567890123450".to_vec())
-            .with_key("kc2".to_owned(), b"1234567890123451".to_vec()),
+    roundtrip_non_uniform_encryption_plaintext_footer_with_key_retriever(
+        AES_128_FOOTER_KEY,
+        AES_128_FOOTER_KEY_NAME,
+        BAD_AES_128_FOOTER_KEY,
+        &[
+            (AES_128_KEY_NAMES[0], AES_128_COLUMN_KEYS[0]),
+            (AES_128_KEY_NAMES[1], AES_128_COLUMN_KEYS[1]),
+        ],
+        &[
+            (
+                AES_128_COLUMN_NAMES[0],
+                AES_128_COLUMN_KEYS[0],
+                AES_128_KEY_NAMES[0],
+            ),
+            (
+                AES_128_COLUMN_NAMES[1],
+                AES_128_COLUMN_KEYS[1],
+                AES_128_KEY_NAMES[1],
+            ),
+        ],
     );
-    let decryption_properties = 
FileDecryptionProperties::with_key_retriever(key_retriever)
-        .build()
-        .unwrap();
-    let options =
-        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
-    let result = ArrowReaderMetadata::load(&temp_file, options.clone());
-    assert!(result.is_err());
-    assert!(
-        result
-            .unwrap_err()
-            .to_string()
-            .starts_with("Parquet error: Footer signature verification failed. 
Computed: [")
+
+    roundtrip_non_uniform_encryption_plaintext_footer_with_key_retriever(
+        AES_256_FOOTER_KEY,
+        AES_256_FOOTER_KEY_NAME,
+        BAD_AES_256_FOOTER_KEY,
+        &[
+            (AES_256_KEY_NAMES[0], AES_256_COLUMN_KEYS[0]),
+            (AES_256_KEY_NAMES[1], AES_256_COLUMN_KEYS[1]),
+            (AES_256_KEY_NAMES[2], AES_256_COLUMN_KEYS[2]),
+            (AES_256_KEY_NAMES[3], AES_256_COLUMN_KEYS[3]),
+            (AES_256_KEY_NAMES[4], AES_256_COLUMN_KEYS[4]),
+            (AES_256_KEY_NAMES[5], AES_256_COLUMN_KEYS[5]),
+            (AES_256_KEY_NAMES[6], AES_256_COLUMN_KEYS[6]),
+            (AES_256_KEY_NAMES[7], AES_256_COLUMN_KEYS[7]),
+        ],
+        &[
+            (
+                AES_256_COLUMN_NAMES[0],
+                AES_256_COLUMN_KEYS[0],
+                AES_256_KEY_NAMES[0],
+            ),
+            (
+                AES_256_COLUMN_NAMES[1],
+                AES_256_COLUMN_KEYS[1],
+                AES_256_KEY_NAMES[1],
+            ),
+            (
+                AES_256_COLUMN_NAMES[2],
+                AES_256_COLUMN_KEYS[2],
+                AES_256_KEY_NAMES[2],
+            ),
+            (
+                AES_256_COLUMN_NAMES[3],
+                AES_256_COLUMN_KEYS[3],
+                AES_256_KEY_NAMES[3],
+            ),
+            (
+                AES_256_COLUMN_NAMES[4],
+                AES_256_COLUMN_KEYS[4],
+                AES_256_KEY_NAMES[4],
+            ),
+            (
+                AES_256_COLUMN_NAMES[5],
+                AES_256_COLUMN_KEYS[5],
+                AES_256_KEY_NAMES[5],
+            ),
+            (
+                AES_256_COLUMN_NAMES[6],
+                AES_256_COLUMN_KEYS[6],
+                AES_256_KEY_NAMES[6],
+            ),
+            (
+                AES_256_COLUMN_NAMES[7],
+                AES_256_COLUMN_KEYS[7],
+                AES_256_KEY_NAMES[7],
+            ),
+        ],
     );
 }
 
 #[test]
 fn test_non_uniform_encryption_with_key_retriever() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn non_uniform_encryption_with_key_retriever(footer_key: &[u8], keys: 
&[(&str, &[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    let key_retriever = TestKeyRetriever::new()
-        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
-        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
-        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+        let mut key_retriever = TestKeyRetriever::new();
+        for (key_name, key) in keys {
+            key_retriever = key_retriever.with_key((*key_name).to_owned(), 
(*key).to_vec());
+        }
 
-    let decryption_properties =
-        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
-            .build()
-            .unwrap();
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+                .build()
+                .unwrap();
+
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
+
+    non_uniform_encryption_with_key_retriever(AES_128_FOOTER_KEY, 
AES_128_KEY_NAME_KEY);
 
-    verify_encryption_test_file_read(file, decryption_properties);
+    non_uniform_encryption_with_key_retriever(AES_256_FOOTER_KEY, 
AES_256_KEY_NAME_KEY);
 }
 
 #[test]
 fn test_uniform_encryption_with_key_retriever() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn uniform_encryption_with_key_retriever(key_name: &str, footer_key: 
&[u8]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "uniform_encryption.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    let key_retriever =
-        TestKeyRetriever::new().with_key("kf".to_owned(), 
"0123456789012345".as_bytes().to_vec());
+        let key_retriever =
+            TestKeyRetriever::new().with_key(key_name.to_owned(), 
footer_key.to_vec());
 
-    let decryption_properties =
-        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
-            .build()
-            .unwrap();
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+                .build()
+                .unwrap();
+
+        verify_encryption_test_file_read(file, decryption_properties);
+    }
 
-    verify_encryption_test_file_read(file, decryption_properties);
+    uniform_encryption_with_key_retriever(AES_128_FOOTER_KEY_NAME, 
AES_128_FOOTER_KEY);
+    uniform_encryption_with_key_retriever(AES_256_FOOTER_KEY_NAME, 
AES_256_FOOTER_KEY);
 }
 
 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
@@ -426,7 +578,7 @@ fn uniform_encryption_roundtrip(
 
     let file = tempfile::tempfile()?;
 
-    let footer_key = b"0123456789012345";
+    let footer_key = AES_128_FOOTER_KEY;
     let file_encryption_properties =
         FileEncryptionProperties::builder(footer_key.to_vec()).build()?;
 
@@ -531,7 +683,7 @@ fn uniform_encryption_page_skipping(page_index: bool) -> 
parquet::errors::Result
 
     let file = tempfile::tempfile()?;
 
-    let footer_key = b"0123456789012345";
+    let footer_key = AES_128_FOOTER_KEY;
     let file_encryption_properties =
         FileEncryptionProperties::builder(footer_key.to_vec()).build()?;
 
@@ -618,27 +770,50 @@ fn uniform_encryption_page_skipping(page_index: bool) -> 
parquet::errors::Result
 
 #[test]
 fn test_write_non_uniform_encryption() {
-    let testdata = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn write_non_uniform_encryption(
+        footer_key: &[u8],
+        column_names: Vec<&str>,
+        column_keys: Vec<Vec<u8>>,
+        encryption_column_keys: &[(&str, &[u8])],
+    ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
-    let column_names = vec!["double_field", "float_field"];
-    let column_keys = vec![b"1234567890123450".to_vec(), 
b"1234567890123451".to_vec()];
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_keys(column_names.to_vec(), column_keys.clone())
+            .unwrap()
+            .build()
+            .unwrap();
 
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
-        .with_column_keys(column_names.clone(), column_keys.clone())
-        .unwrap()
-        .build()
-        .unwrap();
+        let mut builder = 
FileEncryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in encryption_column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let file_encryption_properties = builder.build().unwrap();
 
-    let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key)
-        .with_column_keys(column_names, column_keys)
-        .unwrap()
-        .build()
-        .unwrap();
+        read_and_roundtrip_to_encrypted_file(
+            &file,
+            decryption_properties,
+            file_encryption_properties,
+        );
+    }
 
-    read_and_roundtrip_to_encrypted_file(&file, decryption_properties, 
file_encryption_properties);
+    write_non_uniform_encryption(
+        AES_128_FOOTER_KEY,
+        AES_128_COLUMN_NAMES.to_vec(),
+        AES_128_COLUMN_KEYS.iter().map(|&s| s.to_vec()).collect(),
+        AES_128_COLUMN_NAME_KEYS,
+    );
+
+    write_non_uniform_encryption(
+        AES_256_FOOTER_KEY,
+        AES_256_COLUMN_NAMES.to_vec(),
+        AES_256_COLUMN_KEYS.iter().map(|&s| s.to_vec()).collect(),
+        AES_256_COLUMN_NAME_KEYS,
+    );
 }
 
 #[test]
@@ -647,20 +822,20 @@ fn test_write_uniform_encryption_plaintext_footer() {
     let path = 
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
     let file = File::open(path).unwrap();
 
-    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
-    let wrong_footer_key = b"0000000000000000".to_vec(); // 128bit/16
-    let column_1_key = b"1234567890123450".to_vec();
-    let column_2_key = b"1234567890123451".to_vec();
+    let footer_key = AES_128_FOOTER_KEY.to_vec();
+    let wrong_footer_key = BAD_AES_128_FOOTER_KEY.to_vec();
+    let column_1_key = AES_128_COLUMN_KEYS[0].to_vec();
+    let column_2_key = AES_128_COLUMN_KEYS[1].to_vec();
 
     let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
-        .with_column_key("double_field", column_1_key.clone())
-        .with_column_key("float_field", column_2_key.clone())
+        .with_column_key(AES_128_COLUMN_NAMES[0], column_1_key.clone())
+        .with_column_key(AES_128_COLUMN_NAMES[1], column_2_key.clone())
         .build()
         .unwrap();
 
     let wrong_decryption_properties = 
FileDecryptionProperties::builder(wrong_footer_key)
-        .with_column_key("double_field", column_1_key)
-        .with_column_key("float_field", column_2_key)
+        .with_column_key(AES_128_COLUMN_NAMES[0], column_1_key)
+        .with_column_key(AES_128_COLUMN_NAMES[1], column_2_key)
         .build()
         .unwrap();
 
@@ -721,8 +896,8 @@ fn test_write_uniform_encryption_plaintext_footer() {
 
 #[test]
 pub fn test_column_statistics_with_plaintext_footer() {
-    let footer_key = b"0123456789012345".to_vec();
-    let column_key = b"1234567890123450".to_vec();
+    let footer_key = AES_128_FOOTER_KEY.to_vec();
+    let column_key = AES_128_COLUMN_KEYS[0].to_vec();
 
     // Encrypt with a plaintext footer and column-specific keys
     let encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
@@ -755,7 +930,7 @@ pub fn test_column_statistics_with_plaintext_footer() {
 
 #[test]
 pub fn test_column_statistics_with_plaintext_footer_and_uniform_encryption() {
-    let footer_key = b"0123456789012345".to_vec();
+    let footer_key = AES_128_FOOTER_KEY.to_vec();
 
     // Write with uniform encryption and a plaintext footer.
     let encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
@@ -780,8 +955,8 @@ pub fn 
test_column_statistics_with_plaintext_footer_and_uniform_encryption() {
 
 #[test]
 pub fn test_column_statistics_with_encrypted_footer() {
-    let footer_key = b"0123456789012345".to_vec();
-    let column_key = b"1234567890123450".to_vec();
+    let footer_key = AES_128_FOOTER_KEY.to_vec();
+    let column_key = AES_128_COLUMN_KEYS[0].to_vec();
 
     // Encrypt with an encrypted footer and column-specific keys
     let encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
@@ -810,7 +985,7 @@ pub fn test_column_statistics_with_encrypted_footer() {
 
 #[test]
 pub fn test_column_statistics_with_encrypted_footer_and_uniform_encryption() {
-    let footer_key = b"0123456789012345".to_vec();
+    let footer_key = AES_128_FOOTER_KEY.to_vec();
 
     // Encrypt with an encrypted footer and uniform encryption
     let encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
@@ -934,64 +1109,94 @@ fn write_and_read_stats(
 
 #[test]
 fn test_write_uniform_encryption() {
-    let testdata = arrow::util::test_util::parquet_test_data();
-    let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
-    let file = File::open(path).unwrap();
+    fn write_uniform_encryption(footer_key: &[u8]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "uniform_encryption.parquet.encrypted",
+        );
+        let file = File::open(path).unwrap();
 
-    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .build()
+            .unwrap();
 
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
-        .build()
-        .unwrap();
+        let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key.to_vec())
+            .build()
+            .unwrap();
 
-    let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key)
-        .build()
-        .unwrap();
+        read_and_roundtrip_to_encrypted_file(
+            &file,
+            decryption_properties,
+            file_encryption_properties,
+        );
+    }
 
-    read_and_roundtrip_to_encrypted_file(&file, decryption_properties, 
file_encryption_properties);
+    write_uniform_encryption(AES_128_FOOTER_KEY);
+    write_uniform_encryption(AES_256_FOOTER_KEY);
 }
 
 #[test]
 fn test_write_non_uniform_encryption_column_missmatch() {
-    let testdata = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+    fn write_non_uniform_encryption_column_missmatch(
+        footer_key: &[u8],
+        column_keys: &[(&str, &[u8])],
+        encryption_column_keys: &[(&str, &[u8])],
+    ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
 
-    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
-    let column_1_key = b"1234567890123450".to_vec();
-    let column_2_key = b"1234567890123451".to_vec();
+        let mut decryption_builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            decryption_builder = 
decryption_builder.with_column_key(column_name, key.to_vec());
+        }
 
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
-        .with_column_key("double_field", column_1_key.clone())
-        .with_column_key("float_field", column_2_key.clone())
-        .build()
-        .unwrap();
+        let decryption_properties = decryption_builder.build().unwrap();
 
-    let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key)
-        .with_column_key("double_field", column_1_key.clone())
-        .with_column_key("other_field", column_1_key)
-        .with_column_key("yet_another_field", column_2_key)
-        .build()
-        .unwrap();
+        let mut file_encryption_builder = 
FileEncryptionProperties::builder(footer_key.to_vec())
+            .with_column_key("other_field", 
encryption_column_keys[0].1.to_vec())
+            .with_column_key("yet_another_field", 
encryption_column_keys[1].1.to_vec());
 
-    let temp_file = tempfile::tempfile().unwrap();
+        for (column_name, key) in encryption_column_keys {
+            file_encryption_builder =
+                file_encryption_builder.with_column_key(column_name, 
key.to_vec());
+        }
 
-    // read example data
-    let file = File::open(path).unwrap();
-    let options = ArrowReaderOptions::default()
-        .with_file_decryption_properties(decryption_properties.clone());
-    let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
-    let props = WriterProperties::builder()
-        .with_file_encryption_properties(file_encryption_properties)
-        .build();
+        let file_encryption_properties = 
file_encryption_builder.build().unwrap();
 
-    let result = ArrowWriter::try_new(
-        temp_file.try_clone().unwrap(),
-        metadata.schema().clone(),
-        Some(props),
+        let temp_file = tempfile::tempfile().unwrap();
+
+        // read example data
+        let file = File::open(path).unwrap();
+        let options = ArrowReaderOptions::default()
+            .with_file_decryption_properties(decryption_properties.clone());
+        let metadata = ArrowReaderMetadata::load(&file, 
options.clone()).unwrap();
+        let props = WriterProperties::builder()
+            .with_file_encryption_properties(file_encryption_properties)
+            .build();
+
+        let result = ArrowWriter::try_new(
+            temp_file.try_clone().unwrap(),
+            metadata.schema().clone(),
+            Some(props),
+        );
+        assert_eq!(
+            result.unwrap_err().to_string(),
+            "Parquet error: The following columns with encryption keys 
specified were not found in the schema: other_field, yet_another_field"
+        );
+    }
+
+    write_non_uniform_encryption_column_missmatch(
+        AES_128_FOOTER_KEY,
+        AES_128_COLUMN_NAME_KEYS,
+        AES_128_COLUMN_NAME_KEYS,
     );
-    assert_eq!(
-        result.unwrap_err().to_string(),
-        "Parquet error: The following columns with encryption keys specified 
were not found in the schema: other_field, yet_another_field"
+
+    write_non_uniform_encryption_column_missmatch(
+        AES_256_FOOTER_KEY,
+        AES_256_COLUMN_NAME_KEYS,
+        AES_256_COLUMN_NAME_KEYS,
     );
 }
 
@@ -1014,7 +1219,7 @@ fn test_write_encrypted_column() {
     let file: File = tempfile::tempfile().unwrap();
 
     let builder = WriterProperties::builder();
-    let footer_key: &[u8] = "0123456789012345".as_bytes();
+    let footer_key: &[u8] = AES_128_FOOTER_KEY;
     let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key.to_vec())
         .build()
         .unwrap();
@@ -1117,9 +1322,9 @@ fn test_write_encrypted_struct_field() {
     // keys need to be specified for each leaf-level Parquet column using the 
full "." separated
     // column path.
     let builder = WriterProperties::builder();
-    let footer_key = b"0123456789012345".to_vec();
-    let column_key_1 = b"1234567890123450".to_vec();
-    let column_key_2 = b"1234567890123451".to_vec();
+    let footer_key = AES_128_FOOTER_KEY.to_vec();
+    let column_key_1 = AES_128_COLUMN_KEYS[0].to_vec();
+    let column_key_2 = AES_128_COLUMN_KEYS[1].to_vec();
     let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
         .with_column_key("struct_col.int64_col", column_key_1.clone())
         .with_column_key("struct_col.float64_col", column_key_2.clone())
@@ -1184,8 +1389,8 @@ pub fn 
test_retrieve_row_group_statistics_after_encrypted_write() {
 
     let temp_file = tempfile::tempfile().unwrap();
 
-    let footer_key = b"0123456789012345".to_vec();
-    let column_key = b"1234567890123450".to_vec();
+    let footer_key = AES_128_FOOTER_KEY.to_vec();
+    let column_key = AES_128_COLUMN_KEYS[0].to_vec();
     let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
         .with_column_key("x", column_key.clone())
         .build()
@@ -1218,33 +1423,40 @@ pub fn 
test_retrieve_row_group_statistics_after_encrypted_write() {
 
 #[test]
 fn test_decrypt_page_index_uniform() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    fn decrypt_page_index_uniform(footer_key: &[u8]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "uniform_encryption.parquet.encrypted",
+        );
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .build()
+            .unwrap();
 
-    let key_code: &[u8] = "0123456789012345".as_bytes();
-    let decryption_properties = 
FileDecryptionProperties::builder(key_code.to_vec())
-        .build()
-        .unwrap();
+        test_decrypt_page_index(&path, decryption_properties).unwrap();
+    }
 
-    test_decrypt_page_index(&path, decryption_properties).unwrap();
+    decrypt_page_index_uniform(AES_128_FOOTER_KEY);
+    decrypt_page_index_uniform(AES_256_FOOTER_KEY);
 }
 
 #[test]
 fn test_decrypt_page_index_non_uniform() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
-
-    let footer_key = "0123456789012345".as_bytes().to_vec();
-    let column_1_key = "1234567890123450".as_bytes().to_vec();
-    let column_2_key = "1234567890123451".as_bytes().to_vec();
+    fn decrypt_page_index_non_uniform(footer_key: &[u8], column_keys: &[(&str, 
&[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
+        test_decrypt_page_index(&path, decryption_properties).unwrap();
+    }
 
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
-        .with_column_key("double_field", column_1_key)
-        .with_column_key("float_field", column_2_key)
-        .build()
-        .unwrap();
+    decrypt_page_index_non_uniform(AES_128_FOOTER_KEY, 
AES_128_COLUMN_NAME_KEYS);
 
-    test_decrypt_page_index(&path, decryption_properties).unwrap();
+    decrypt_page_index_non_uniform(AES_256_FOOTER_KEY, 
AES_256_COLUMN_NAME_KEYS);
 }
 
 fn test_decrypt_page_index(
diff --git a/parquet/tests/encryption/encryption_async.rs 
b/parquet/tests/encryption/encryption_async.rs
index 48c844afb9..f86ab59bf7 100644
--- a/parquet/tests/encryption/encryption_async.rs
+++ b/parquet/tests/encryption/encryption_async.rs
@@ -17,9 +17,13 @@
 
 //! This module contains tests for reading encrypted Parquet files with the 
async Arrow API
 
+use crate::encryption_util;
 use crate::encryption_util::{
-    TestKeyRetriever, read_encrypted_file, verify_column_indexes,
-    verify_encryption_double_test_data, verify_encryption_test_data,
+    AES_128_COLUMN_KEYS, AES_128_COLUMN_NAME_KEYS, AES_128_COLUMN_NAMES, 
AES_128_FOOTER_KEY,
+    AES_128_FOOTER_KEY_NAME, AES_128_KEY_NAME_KEY, AES_256_COLUMN_KEYS, 
AES_256_COLUMN_NAME_KEYS,
+    AES_256_COLUMN_NAMES, AES_256_FOOTER_KEY, AES_256_FOOTER_KEY_NAME, 
AES_256_KEY_NAME_KEY,
+    BAD_AES_128_FOOTER_KEY, BAD_AES_256_FOOTER_KEY, TestKeyRetriever, 
read_encrypted_file,
+    verify_column_indexes, verify_encryption_double_test_data, 
verify_encryption_test_data,
 };
 use arrow_array::RecordBatch;
 use arrow_schema::Schema;
@@ -47,61 +51,64 @@ use tokio::task::JoinHandle;
 
 #[tokio::test]
 async fn test_non_uniform_encryption_plaintext_footer() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
-    let mut file = File::open(&path).await.unwrap();
+    async fn non_uniform_encryption_plaintext_footer(
+        footer_key: &[u8],
+        column_keys: &[(&str, &[u8])],
+    ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_plaintext_footer.parquet.encrypted",
+        );
+        let mut file = File::open(&path).await.unwrap();
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
+            .unwrap();
+    }
 
-    // There is always a footer key even with a plaintext footer,
+    // AES-128: there is always a footer key even with a plaintext footer,
     // but this is used for signing the footer.
-    let footer_key = "0123456789012345".as_bytes().to_vec(); // 128bit/16
-    let column_1_key = "1234567890123450".as_bytes().to_vec();
-    let column_2_key = "1234567890123451".as_bytes().to_vec();
-
-    let decryption_properties = FileDecryptionProperties::builder(footer_key)
-        .with_column_key("double_field", column_1_key)
-        .with_column_key("float_field", column_2_key)
-        .build()
-        .unwrap();
+    non_uniform_encryption_plaintext_footer(AES_128_FOOTER_KEY, 
AES_128_COLUMN_NAME_KEYS).await;
 
-    verify_encryption_test_file_read_async(&mut file, decryption_properties)
-        .await
-        .unwrap();
+    // AES-256
+    non_uniform_encryption_plaintext_footer(AES_256_FOOTER_KEY, 
AES_256_COLUMN_NAME_KEYS).await;
 }
 
 #[tokio::test]
 async fn test_misspecified_encryption_keys() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
-
-    // There is always a footer key even with a plaintext footer,
-    // but this is used for signing the footer.
-    let footer_key = "0123456789012345".as_bytes(); // 128bit/16
-    let column_1_key = "1234567890123450".as_bytes();
-    let column_2_key = "1234567890123451".as_bytes();
-
     // read file with keys and check for expected error message
     async fn check_for_error(
         expected_message: &str,
-        path: &String,
         footer_key: &[u8],
         column_1_key: &[u8],
         column_2_key: &[u8],
+        additional_column_keys: &[(&str, &[u8])],
     ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
         let mut file = File::open(&path).await.unwrap();
 
-        let mut decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
 
         if !column_1_key.is_empty() {
-            decryption_properties =
-                decryption_properties.with_column_key("double_field", 
column_1_key.to_vec());
+            builder = builder.with_column_key(AES_128_COLUMN_NAMES[0], 
column_1_key.to_vec());
         }
 
         if !column_2_key.is_empty() {
-            decryption_properties =
-                decryption_properties.with_column_key("float_field", 
column_2_key.to_vec());
+            builder = builder.with_column_key(AES_128_COLUMN_NAMES[1], 
column_2_key.to_vec());
+        }
+
+        for (column_name, key) in additional_column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
         }
 
-        let decryption_properties = decryption_properties.build().unwrap();
+        let decryption_properties = builder.build().unwrap();
 
         match verify_encryption_test_file_read_async(&mut file, 
decryption_properties).await {
             Ok(_) => {
@@ -113,63 +120,149 @@ async fn test_misspecified_encryption_keys() {
         }
     }
 
+    // There is always a footer key even with a plaintext footer,
+    // but this is used for signing the footer.
+    let footer_key = AES_128_FOOTER_KEY;
+    let column_1_key = AES_128_COLUMN_KEYS[0];
+    let column_2_key = AES_128_COLUMN_KEYS[1];
+    let empty_column_key = &[];
+
     // Too short footer key
     check_for_error(
-        "Parquet error: Invalid footer key. Failed to create AES key",
-        &path,
-        "bad_pwd".as_bytes(),
+        format!("Parquet error: Invalid footer key. Error creating 
RingGcmBlockDecryptor with unsupported key length: {}", 
"bad_pwd".len()).as_str(),
+        b"bad_pwd",
         column_1_key,
         column_2_key,
+        empty_column_key
     )
     .await;
 
     // Wrong footer key
     check_for_error(
         "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer",
-        &path,
-        "1123456789012345".as_bytes(),
+        BAD_AES_128_FOOTER_KEY,
         column_1_key,
         column_2_key,
+        empty_column_key,
     )
     .await;
 
     // Missing column key
     check_for_error(
         "Parquet error: No column decryption key set for encrypted column 
'double_field'",
-        &path,
         footer_key,
         "".as_bytes(),
         column_2_key,
+        empty_column_key,
     )
     .await;
 
     // Too short column key
     check_for_error(
-        "Parquet error: Failed to create AES key",
-        &path,
+        format!(
+            "Parquet error: Error creating RingGcmBlockDecryptor with 
unsupported key length: {}",
+            "abc".len()
+        )
+        .as_str(),
         footer_key,
         "abc".as_bytes(),
         column_2_key,
+        empty_column_key,
     )
     .await;
 
     // Wrong column key
     check_for_error(
         "Parquet error: Unable to decrypt column 'double_field', perhaps the 
column key is wrong?",
-        &path,
         footer_key,
         "1123456789012345".as_bytes(),
         column_2_key,
+        empty_column_key,
     )
     .await;
 
     // Mixed up keys
     check_for_error(
         "Parquet error: Unable to decrypt column 'float_field', perhaps the 
column key is wrong?",
-        &path,
         footer_key,
         column_2_key,
         column_1_key,
+        empty_column_key,
+    )
+    .await;
+
+    let aes256_footer_key = AES_256_FOOTER_KEY;
+    let aes256_column_1_key = AES_256_COLUMN_KEYS[0];
+    let aes256_column_2_key = AES_256_COLUMN_KEYS[1];
+    let additional_column_keys = &[
+        (AES_256_COLUMN_NAMES[2], AES_256_COLUMN_KEYS[2]),
+        (AES_256_COLUMN_NAMES[3], AES_256_COLUMN_KEYS[3]),
+        (AES_256_COLUMN_NAMES[4], AES_256_COLUMN_KEYS[4]),
+        (AES_256_COLUMN_NAMES[5], AES_256_COLUMN_KEYS[5]),
+        (AES_256_COLUMN_NAMES[6], AES_256_COLUMN_KEYS[6]),
+        (AES_256_COLUMN_NAMES[7], AES_256_COLUMN_KEYS[7]),
+    ];
+
+    // Too short footer key
+    check_for_error(
+        format!("Parquet error: Invalid footer key. Error creating 
RingGcmBlockDecryptor with unsupported key length: {}", 
"bad_pwd".len()).as_str(),
+        b"bad_pwd",
+        aes256_column_1_key,
+        aes256_column_2_key,
+        additional_column_keys
+    ).await;
+
+    // Wrong footer key
+    check_for_error(
+        "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer",
+        BAD_AES_256_FOOTER_KEY,
+        aes256_column_1_key,
+        aes256_column_2_key,
+        additional_column_keys,
+    )
+    .await;
+
+    // Missing column key
+    check_for_error(
+        "Parquet error: No column decryption key set for encrypted column 
'double_field'",
+        aes256_footer_key,
+        "".as_bytes(),
+        aes256_column_2_key,
+        additional_column_keys,
+    )
+    .await;
+
+    // Too short column key
+    check_for_error(
+        format!(
+            "Parquet error: Error creating RingGcmBlockDecryptor with 
unsupported key length: {}",
+            "abc".len()
+        )
+        .as_str(),
+        aes256_footer_key,
+        "abc".as_bytes(),
+        aes256_column_2_key,
+        additional_column_keys,
+    )
+    .await;
+
+    // Wrong column key
+    check_for_error(
+        "Parquet error: Unable to decrypt column 'double_field', perhaps the 
column key is wrong?",
+        aes256_footer_key,
+        "22345678901234567890123456789012".as_bytes(),
+        aes256_column_2_key,
+        additional_column_keys,
+    )
+    .await;
+
+    // Mixed up keys
+    check_for_error(
+        "Parquet error: Unable to decrypt column 'float_field', perhaps the 
column key is wrong?",
+        aes256_footer_key,
+        aes256_column_2_key,
+        aes256_column_1_key,
+        additional_column_keys,
     )
     .await;
 }
@@ -183,68 +276,81 @@ async fn test_plaintext_footer_read_without_decryption() {
 
 #[tokio::test]
 async fn test_non_uniform_encryption() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
-    let mut file = File::open(&path).await.unwrap();
+    async fn non_uniform_encryption(footer_key: &[u8], column_keys: &[(&str, 
&[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
+        let mut file = File::open(&path).await.unwrap();
 
-    let footer_key = "0123456789012345".as_bytes().to_vec(); // 128bit/16
-    let column_1_key = "1234567890123450".as_bytes().to_vec();
-    let column_2_key = "1234567890123451".as_bytes().to_vec();
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
 
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
-        .with_column_key("double_field", column_1_key)
-        .with_column_key("float_field", column_2_key)
-        .build()
-        .unwrap();
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
+            .unwrap();
+    }
 
-    verify_encryption_test_file_read_async(&mut file, decryption_properties)
-        .await
-        .unwrap();
+    non_uniform_encryption(AES_128_FOOTER_KEY, AES_128_COLUMN_NAME_KEYS).await;
+    non_uniform_encryption(AES_256_FOOTER_KEY, AES_256_COLUMN_NAME_KEYS).await;
 }
 
 #[tokio::test]
 async fn test_uniform_encryption() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
-    let mut file = File::open(&path).await.unwrap();
+    async fn uniform_encryption(footer_key: &[u8]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "uniform_encryption.parquet.encrypted",
+        );
+        let mut file = File::open(&path).await.unwrap();
 
-    let key_code: &[u8] = "0123456789012345".as_bytes();
-    let decryption_properties = 
FileDecryptionProperties::builder(key_code.to_vec())
-        .build()
-        .unwrap();
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .build()
+            .unwrap();
 
-    verify_encryption_test_file_read_async(&mut file, decryption_properties)
-        .await
-        .unwrap();
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
+            .unwrap();
+    }
+
+    uniform_encryption(AES_128_FOOTER_KEY).await;
+    uniform_encryption(AES_256_FOOTER_KEY).await;
 }
 
 #[tokio::test]
 async fn test_aes_ctr_encryption() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_and_footer_ctr.parquet.encrypted");
-    let mut file = File::open(&path).await.unwrap();
+    async fn aes_ctr_encryption(footer_key: &[u8], column_keys: &[(&str, 
&[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer_ctr.parquet.encrypted",
+        );
+        let mut file = File::open(&path).await.unwrap();
 
-    let footer_key = "0123456789012345".as_bytes().to_vec();
-    let column_1_key = "1234567890123450".as_bytes().to_vec();
-    //let column_2_key = "1234567890123451".as_bytes().to_vec();
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
 
-    let decryption_properties = FileDecryptionProperties::builder(footer_key)
-        .with_column_key("double_field", column_1_key.clone())
-        .with_column_key("float_field", column_1_key)
-        .build()
-        .unwrap();
+        let options =
+            
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+        let metadata = ArrowReaderMetadata::load_async(&mut file, 
options).await;
 
-    let options = 
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
-    let metadata = ArrowReaderMetadata::load_async(&mut file, options).await;
+        match metadata {
+            Err(ParquetError::NYI(s)) => {
+                assert!(s.contains("AES_GCM_CTR_V1"));
+            }
+            _ => {
+                panic!("Expected ParquetError::NYI");
+            }
+        };
+    }
 
-    match metadata {
-        Err(ParquetError::NYI(s)) => {
-            assert!(s.contains("AES_GCM_CTR_V1"));
-        }
-        _ => {
-            panic!("Expected ParquetError::NYI");
-        }
-    };
+    aes_ctr_encryption(AES_128_FOOTER_KEY, AES_128_COLUMN_NAME_KEYS).await;
+    aes_ctr_encryption(AES_256_FOOTER_KEY, AES_256_COLUMN_NAME_KEYS).await;
 }
 
 #[tokio::test]
@@ -264,32 +370,54 @@ async fn 
test_decrypting_without_decryption_properties_fails() {
 
 #[tokio::test]
 async fn test_write_non_uniform_encryption() {
-    let testdata = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+    async fn write_non_uniform_encryption(
+        footer_key: &[u8],
+        column_names: Vec<&str>,
+        column_keys: Vec<Vec<u8>>,
+        encryption_column_keys: &[(&str, &[u8])],
+    ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .with_column_keys(column_names.to_vec(), column_keys.clone())
+            .unwrap()
+            .build()
+            .unwrap();
 
-    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
-    let column_names = vec!["double_field", "float_field"];
-    let column_keys = vec![b"1234567890123450".to_vec(), 
b"1234567890123451".to_vec()];
+        let mut builder = 
FileEncryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in encryption_column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let file_encryption_properties = builder.build().unwrap();
 
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
-        .with_column_keys(column_names.clone(), column_keys.clone())
-        .unwrap()
-        .build()
+        read_and_roundtrip_to_encrypted_file_async(
+            &path,
+            decryption_properties,
+            file_encryption_properties,
+        )
+        .await
         .unwrap();
+    }
 
-    let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key)
-        .with_column_keys(column_names, column_keys)
-        .unwrap()
-        .build()
-        .unwrap();
+    write_non_uniform_encryption(
+        AES_128_FOOTER_KEY,
+        AES_128_COLUMN_NAMES.to_vec(),
+        AES_128_COLUMN_KEYS.iter().map(|&s| s.to_vec()).collect(),
+        AES_128_COLUMN_NAME_KEYS,
+    )
+    .await;
 
-    read_and_roundtrip_to_encrypted_file_async(
-        &path,
-        decryption_properties,
-        file_encryption_properties,
+    // AES-256
+    write_non_uniform_encryption(
+        AES_256_FOOTER_KEY,
+        AES_256_COLUMN_NAMES.to_vec(),
+        AES_256_COLUMN_KEYS.iter().map(|&s| s.to_vec()).collect(),
+        AES_256_COLUMN_NAME_KEYS,
     )
-    .await
-    .unwrap();
+    .await;
 }
 
 #[cfg(feature = "object_store")]
@@ -338,98 +466,138 @@ async fn test_read_encrypted_file_from_object_store() {
 
 #[tokio::test]
 async fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() {
-    let testdata = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
-    let mut file = File::open(&path).await.unwrap();
+    async fn non_uniform_encryption_plaintext_footer_with_key_retriever(
+        footer_key: &[u8],
+        keys: &[(&str, &[u8])],
+    ) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_plaintext_footer.parquet.encrypted",
+        );
+        let mut file = File::open(&path).await.unwrap();
 
-    let key_retriever = TestKeyRetriever::new()
-        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
-        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
-        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+        let mut key_retriever = TestKeyRetriever::new();
+        for (key_name, key) in keys {
+            key_retriever = key_retriever.with_key((*key_name).to_owned(), 
(*key).to_vec());
+        }
 
-    let decryption_properties =
-        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
-            .build()
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+                .build()
+                .unwrap();
+
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
             .unwrap();
+    }
 
-    verify_encryption_test_file_read_async(&mut file, decryption_properties)
-        .await
-        .unwrap();
+    non_uniform_encryption_plaintext_footer_with_key_retriever(
+        AES_128_FOOTER_KEY,
+        AES_128_KEY_NAME_KEY,
+    )
+    .await;
+
+    non_uniform_encryption_plaintext_footer_with_key_retriever(
+        AES_256_FOOTER_KEY,
+        AES_256_KEY_NAME_KEY,
+    )
+    .await;
 }
 
 #[tokio::test]
 async fn test_non_uniform_encryption_with_key_retriever() {
-    let testdata = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
-    let mut file = File::open(&path).await.unwrap();
+    async fn non_uniform_encryption_with_key_retriever(footer_key: &[u8], 
keys: &[(&str, &[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
+        let mut file = File::open(&path).await.unwrap();
 
-    let key_retriever = TestKeyRetriever::new()
-        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
-        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
-        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+        let mut key_retriever = TestKeyRetriever::new();
+        for (key_name, key) in keys {
+            key_retriever = key_retriever.with_key((*key_name).to_owned(), 
(*key).to_vec());
+        }
 
-    let decryption_properties =
-        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
-            .build()
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+                .build()
+                .unwrap();
+
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
             .unwrap();
+    }
 
-    verify_encryption_test_file_read_async(&mut file, decryption_properties)
-        .await
-        .unwrap();
+    non_uniform_encryption_with_key_retriever(AES_128_FOOTER_KEY, 
AES_128_KEY_NAME_KEY).await;
+    non_uniform_encryption_with_key_retriever(AES_256_FOOTER_KEY, 
AES_256_KEY_NAME_KEY).await;
 }
 
 #[tokio::test]
 async fn test_uniform_encryption_with_key_retriever() {
-    let testdata = arrow::util::test_util::parquet_test_data();
-    let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
-    let mut file = File::open(&path).await.unwrap();
+    async fn uniform_encryption_with_key_retriever(key_name: &str, footer_key: 
&[u8]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "uniform_encryption.parquet.encrypted",
+        );
+        let mut file = File::open(&path).await.unwrap();
 
-    let key_retriever =
-        TestKeyRetriever::new().with_key("kf".to_owned(), 
"0123456789012345".as_bytes().to_vec());
+        let key_retriever =
+            TestKeyRetriever::new().with_key(key_name.to_owned(), 
footer_key.to_vec());
 
-    let decryption_properties =
-        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
-            .build()
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+                .build()
+                .unwrap();
+
+        verify_encryption_test_file_read_async(&mut file, 
decryption_properties)
+            .await
             .unwrap();
+    }
 
-    verify_encryption_test_file_read_async(&mut file, decryption_properties)
-        .await
-        .unwrap();
+    uniform_encryption_with_key_retriever(AES_128_FOOTER_KEY_NAME, 
AES_128_FOOTER_KEY).await;
+    uniform_encryption_with_key_retriever(AES_256_FOOTER_KEY_NAME, 
AES_256_FOOTER_KEY).await;
 }
 
 #[tokio::test]
 async fn test_decrypt_page_index_uniform() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    async fn decrypt_page_index_uniform(footer_key: &[u8]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "uniform_encryption.parquet.encrypted",
+        );
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+            .build()
+            .unwrap();
 
-    let key_code: &[u8] = "0123456789012345".as_bytes();
-    let decryption_properties = 
FileDecryptionProperties::builder(key_code.to_vec())
-        .build()
-        .unwrap();
+        test_decrypt_page_index(&path, decryption_properties)
+            .await
+            .unwrap();
+    }
 
-    test_decrypt_page_index(&path, decryption_properties)
-        .await
-        .unwrap();
+    decrypt_page_index_uniform(AES_128_FOOTER_KEY).await;
+    decrypt_page_index_uniform(AES_256_FOOTER_KEY).await;
 }
 
 #[tokio::test]
 async fn test_decrypt_page_index_non_uniform() {
-    let test_data = arrow::util::test_util::parquet_test_data();
-    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
-
-    let footer_key = "0123456789012345".as_bytes().to_vec();
-    let column_1_key = "1234567890123450".as_bytes().to_vec();
-    let column_2_key = "1234567890123451".as_bytes().to_vec();
+    async fn decrypt_page_index_non_uniform(footer_key: &[u8], column_keys: 
&[(&str, &[u8])]) {
+        let path = encryption_util::encrypted_data_path(
+            footer_key,
+            "encrypt_columns_and_footer.parquet.encrypted",
+        );
+        let mut builder = 
FileDecryptionProperties::builder(footer_key.to_vec());
+        for (column_name, key) in column_keys {
+            builder = builder.with_column_key(column_name, key.to_vec());
+        }
+        let decryption_properties = builder.build().unwrap();
+        test_decrypt_page_index(&path, decryption_properties)
+            .await
+            .unwrap();
+    }
 
-    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
-        .with_column_key("double_field", column_1_key)
-        .with_column_key("float_field", column_2_key)
-        .build()
-        .unwrap();
+    decrypt_page_index_non_uniform(AES_128_FOOTER_KEY, 
AES_128_COLUMN_NAME_KEYS).await;
 
-    test_decrypt_page_index(&path, decryption_properties)
-        .await
-        .unwrap();
+    decrypt_page_index_non_uniform(AES_256_FOOTER_KEY, 
AES_256_COLUMN_NAME_KEYS).await;
 }
 
 async fn test_decrypt_page_index(
@@ -675,14 +843,14 @@ async fn 
test_concurrent_encrypted_writing_over_multiple_row_groups() {
     let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
     let file = std::fs::File::open(path).unwrap();
 
-    let file_encryption_properties = 
FileEncryptionProperties::builder(b"0123456789012345".into())
-        .with_column_key("double_field", b"1234567890123450".into())
-        .with_column_key("float_field", b"1234567890123451".into())
+    let file_encryption_properties = 
FileEncryptionProperties::builder(AES_128_FOOTER_KEY.into())
+        .with_column_key(AES_128_COLUMN_NAMES[0], 
AES_128_COLUMN_KEYS[0].into())
+        .with_column_key(AES_128_COLUMN_NAMES[1], 
AES_128_COLUMN_KEYS[1].into())
         .build()
         .unwrap();
-    let decryption_properties = 
FileDecryptionProperties::builder(b"0123456789012345".into())
-        .with_column_key("double_field", b"1234567890123450".into())
-        .with_column_key("float_field", b"1234567890123451".into())
+    let decryption_properties = 
FileDecryptionProperties::builder(AES_128_FOOTER_KEY.into())
+        .with_column_key(AES_128_COLUMN_NAMES[0], 
AES_128_COLUMN_KEYS[0].into())
+        .with_column_key(AES_128_COLUMN_NAMES[1], 
AES_128_COLUMN_KEYS[1].into())
         .build()
         .unwrap();
 
@@ -748,14 +916,14 @@ async fn test_multi_threaded_encrypted_writing() {
     let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
     let file = std::fs::File::open(path).unwrap();
 
-    let file_encryption_properties = 
FileEncryptionProperties::builder(b"0123456789012345".into())
-        .with_column_key("double_field", b"1234567890123450".into())
-        .with_column_key("float_field", b"1234567890123451".into())
+    let file_encryption_properties = 
FileEncryptionProperties::builder(AES_128_FOOTER_KEY.into())
+        .with_column_key(AES_128_COLUMN_NAMES[0], 
AES_128_COLUMN_KEYS[0].into())
+        .with_column_key(AES_128_COLUMN_NAMES[1], 
AES_128_COLUMN_KEYS[1].into())
         .build()
         .unwrap();
-    let decryption_properties = 
FileDecryptionProperties::builder(b"0123456789012345".into())
-        .with_column_key("double_field", b"1234567890123450".into())
-        .with_column_key("float_field", b"1234567890123451".into())
+    let decryption_properties = 
FileDecryptionProperties::builder(AES_128_FOOTER_KEY.into())
+        .with_column_key(AES_128_COLUMN_NAMES[0], 
AES_128_COLUMN_KEYS[0].into())
+        .with_column_key(AES_128_COLUMN_NAMES[1], 
AES_128_COLUMN_KEYS[1].into())
         .build()
         .unwrap();
 
@@ -849,14 +1017,14 @@ async fn 
test_multi_threaded_encrypted_writing_deprecated() {
     let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
     let file = std::fs::File::open(path).unwrap();
 
-    let file_encryption_properties = 
FileEncryptionProperties::builder(b"0123456789012345".into())
-        .with_column_key("double_field", b"1234567890123450".into())
-        .with_column_key("float_field", b"1234567890123451".into())
+    let file_encryption_properties = 
FileEncryptionProperties::builder(AES_128_FOOTER_KEY.into())
+        .with_column_key(AES_128_COLUMN_NAMES[0], 
AES_128_COLUMN_KEYS[0].into())
+        .with_column_key(AES_128_COLUMN_NAMES[1], 
AES_128_COLUMN_KEYS[1].into())
         .build()
         .unwrap();
-    let decryption_properties = 
FileDecryptionProperties::builder(b"0123456789012345".into())
-        .with_column_key("double_field", b"1234567890123450".into())
-        .with_column_key("float_field", b"1234567890123451".into())
+    let decryption_properties = 
FileDecryptionProperties::builder(AES_128_FOOTER_KEY.into())
+        .with_column_key(AES_128_COLUMN_NAMES[0], 
AES_128_COLUMN_KEYS[0].into())
+        .with_column_key(AES_128_COLUMN_NAMES[1], 
AES_128_COLUMN_KEYS[1].into())
         .build()
         .unwrap();
 
diff --git a/parquet/tests/encryption/encryption_util.rs 
b/parquet/tests/encryption/encryption_util.rs
index 7f4cc5a9da..f0fe66651d 100644
--- a/parquet/tests/encryption/encryption_util.rs
+++ b/parquet/tests/encryption/encryption_util.rs
@@ -26,10 +26,79 @@ use parquet::encryption::encrypt::FileEncryptionProperties;
 use parquet::errors::{ParquetError, Result};
 use parquet::file::metadata::ParquetMetaData;
 use parquet::file::properties::WriterProperties;
+use ring::aead::AES_256_GCM;
 use std::collections::HashMap;
 use std::fs::File;
 use std::sync::{Arc, Mutex};
 
+pub(crate) const AES_128_FOOTER_KEY: &[u8; 16] = b"0123456789012345"; // 
128bit/16
+pub(crate) const BAD_AES_128_FOOTER_KEY: &[u8; 16] = b"0000000000000000";
+pub(crate) const AES_128_FOOTER_KEY_NAME: &str = "kf";
+pub(crate) const AES_128_KEY_NAMES: [&str; 2] = ["kc1", "kc2"];
+pub(crate) const AES_128_COLUMN_NAMES: [&str; 2] = ["double_field", 
"float_field"];
+pub(crate) const AES_128_COLUMN_KEYS: [&[u8; 16]; 2] = [b"1234567890123450", 
b"1234567890123451"];
+
+pub(crate) const AES_128_COLUMN_NAME_KEYS: &[(&str, &[u8]); 2] = &[
+    (AES_128_COLUMN_NAMES[0], AES_128_COLUMN_KEYS[0]),
+    (AES_128_COLUMN_NAMES[1], AES_128_COLUMN_KEYS[1]),
+];
+
+pub(crate) const AES_128_KEY_NAME_KEY: &[(&str, &[u8]); 3] = &[
+    (AES_128_FOOTER_KEY_NAME, AES_128_FOOTER_KEY),
+    (AES_128_KEY_NAMES[0], AES_128_COLUMN_KEYS[0]),
+    (AES_128_KEY_NAMES[1], AES_128_COLUMN_KEYS[1]),
+];
+
+pub(crate) const AES_256_FOOTER_KEY: &[u8; 32] = 
b"01234567890123456789012345678901"; // 256bit/32
+pub(crate) const BAD_AES_256_FOOTER_KEY: &[u8; 32] = 
b"00000000000000000000000000000000";
+pub(crate) const AES_256_FOOTER_KEY_NAME: &str = "kf";
+pub(crate) const AES_256_KEY_NAMES: [&str; 8] =
+    ["kc1", "kc2", "kc3", "kc4", "kc5", "kc6", "kc7", "kc8"];
+
+pub(crate) const AES_256_COLUMN_NAMES: [&str; 8] = [
+    "double_field",
+    "float_field",
+    "boolean_field",
+    "int32_field",
+    "ba_field",
+    "flba_field",
+    "int64_field.list.element",
+    "int96_field",
+];
+pub(crate) const AES_256_COLUMN_KEYS: [&[u8]; 8] = [
+    b"12345678901234567890123456789012",
+    b"12345678901234567890123456789013",
+    b"12345678901234567890123456789014",
+    b"12345678901234567890123456789015",
+    b"12345678901234567890123456789016",
+    b"12345678901234567890123456789017",
+    b"12345678901234567890123456789018",
+    b"12345678901234567890123456789019",
+];
+
+pub(crate) const AES_256_COLUMN_NAME_KEYS: &[(&str, &[u8]); 8] = &[
+    (AES_256_COLUMN_NAMES[0], AES_256_COLUMN_KEYS[0]),
+    (AES_256_COLUMN_NAMES[1], AES_256_COLUMN_KEYS[1]),
+    (AES_256_COLUMN_NAMES[2], AES_256_COLUMN_KEYS[2]),
+    (AES_256_COLUMN_NAMES[3], AES_256_COLUMN_KEYS[3]),
+    (AES_256_COLUMN_NAMES[4], AES_256_COLUMN_KEYS[4]),
+    (AES_256_COLUMN_NAMES[5], AES_256_COLUMN_KEYS[5]),
+    (AES_256_COLUMN_NAMES[6], AES_256_COLUMN_KEYS[6]),
+    (AES_256_COLUMN_NAMES[7], AES_256_COLUMN_KEYS[7]),
+];
+
+pub(crate) const AES_256_KEY_NAME_KEY: &[(&str, &[u8]); 9] = &[
+    (AES_256_FOOTER_KEY_NAME, AES_256_FOOTER_KEY),
+    (AES_256_KEY_NAMES[0], AES_256_COLUMN_KEYS[0]),
+    (AES_256_KEY_NAMES[1], AES_256_COLUMN_KEYS[1]),
+    (AES_256_KEY_NAMES[2], AES_256_COLUMN_KEYS[2]),
+    (AES_256_KEY_NAMES[3], AES_256_COLUMN_KEYS[3]),
+    (AES_256_KEY_NAMES[4], AES_256_COLUMN_KEYS[4]),
+    (AES_256_KEY_NAMES[5], AES_256_COLUMN_KEYS[5]),
+    (AES_256_KEY_NAMES[6], AES_256_COLUMN_KEYS[6]),
+    (AES_256_KEY_NAMES[7], AES_256_COLUMN_KEYS[7]),
+];
+
 pub(crate) fn verify_encryption_double_test_data(
     record_batches: Vec<RecordBatch>,
     metadata: &ParquetMetaData,
@@ -317,3 +386,14 @@ impl KeyRetriever for TestKeyRetriever {
         }
     }
 }
+
+pub fn encrypted_data_path(footer_key: &[u8], file_name: &str) -> String {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let subpath = if AES_256_GCM.key_len() == footer_key.len() {
+        "aes256/"
+    } else {
+        ""
+    };
+    let path = format!("{test_data}/{subpath}/{file_name}");
+    path
+}


Reply via email to