This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 329f6c99c6 Allow retrieving Parquet decryption keys using the key 
metadata (#7286)
329f6c99c6 is described below

commit 329f6c99c65b0a60c7f5cf5e8219ce0c4dda62a1
Author: Adam Reeve <[email protected]>
AuthorDate: Tue Mar 25 06:19:14 2025 +1300

    Allow retrieving Parquet decryption keys using the key metadata (#7286)
    
    * Allow retrieving decryption keys from key metadata
    
    * Refactor to disallow setting keys and a key retriever
    
    * Remove unnecessary clone
    
    * Implement PartialEq explicitly for DecryptionKeys rather than 
FileDecryptionProperties
    
    * Add key retrieval methods to FileDecryptionProperties
    
    * Get column path from ColumnCryptoMetaData
    
    * Tidy up
    
    * Revert rename of parameter
---
 parquet/src/arrow/arrow_reader/mod.rs          |  43 ++---
 parquet/src/arrow/async_reader/mod.rs          |  49 ++----
 parquet/src/encryption/decrypt.rs              | 228 +++++++++++++++++++------
 parquet/src/file/column_crypto_metadata.rs     |  98 +++++++++++
 parquet/src/file/metadata/mod.rs               |  47 +++--
 parquet/src/file/metadata/reader.rs            |  15 +-
 parquet/src/file/mod.rs                        |   2 +
 parquet/tests/arrow_reader/encryption.rs       |  58 ++++++-
 parquet/tests/arrow_reader/encryption_async.rs |  94 +++++++++-
 parquet/tests/arrow_reader/encryption_util.rs  |  42 +++++
 10 files changed, 546 insertions(+), 130 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 588a8ea2fa..9514e868b1 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -700,36 +700,19 @@ impl<T: ChunkReader + 'static> Iterator for 
ReaderPageIterator<T> {
 
         #[cfg(feature = "encryption")]
         let crypto_context = if let Some(file_decryptor) = 
self.metadata.file_decryptor() {
-            let column_name = self
-                .metadata
-                .file_metadata()
-                .schema_descr()
-                .column(self.column_idx);
-
-            if file_decryptor.is_column_encrypted(column_name.name()) {
-                let data_decryptor = 
file_decryptor.get_column_data_decryptor(column_name.name());
-                let data_decryptor = match data_decryptor {
-                    Ok(data_decryptor) => data_decryptor,
-                    Err(err) => return Some(Err(err)),
-                };
-
-                let metadata_decryptor =
-                    
file_decryptor.get_column_metadata_decryptor(column_name.name());
-                let metadata_decryptor = match metadata_decryptor {
-                    Ok(metadata_decryptor) => metadata_decryptor,
-                    Err(err) => return Some(Err(err)),
-                };
-
-                let crypto_context = CryptoContext::new(
-                    rg_idx,
-                    self.column_idx,
-                    data_decryptor,
-                    metadata_decryptor,
-                    file_decryptor.file_aad().clone(),
-                );
-                Some(Arc::new(crypto_context))
-            } else {
-                None
+            match meta.crypto_metadata() {
+                Some(crypto_metadata) => {
+                    match CryptoContext::for_column(
+                        file_decryptor,
+                        crypto_metadata,
+                        rg_idx,
+                        self.column_idx,
+                    ) {
+                        Ok(context) => Some(Arc::new(context)),
+                        Err(err) => return Some(Err(err)),
+                    }
+                }
+                None => None,
             }
         } else {
             None
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index fd49ad2293..f9308dc4dc 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -1027,36 +1027,6 @@ impl RowGroups for InMemoryRowGroup<'_> {
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        #[cfg(feature = "encryption")]
-        let crypto_context = if let Some(file_decryptor) = 
self.metadata.clone().file_decryptor() {
-            let column_name = &self
-                .metadata
-                .clone()
-                .file_metadata()
-                .schema_descr()
-                .column(i);
-
-            if file_decryptor.is_column_encrypted(column_name.name()) {
-                let data_decryptor =
-                    
file_decryptor.get_column_data_decryptor(column_name.name())?;
-                let metadata_decryptor =
-                    
file_decryptor.get_column_metadata_decryptor(column_name.name())?;
-
-                let crypto_context = CryptoContext::new(
-                    self.row_group_idx,
-                    i,
-                    data_decryptor,
-                    metadata_decryptor,
-                    file_decryptor.file_aad().clone(),
-                );
-                Some(Arc::new(crypto_context))
-            } else {
-                None
-            }
-        } else {
-            None
-        };
-
         match &self.column_chunks[i] {
             None => Err(ParquetError::General(format!(
                 "Invalid column index {i}, column was not fetched"
@@ -1067,14 +1037,29 @@ impl RowGroups for InMemoryRowGroup<'_> {
                     // filter out empty offset indexes (old versions specified 
Some(vec![]) when no present)
                     .filter(|index| !index.is_empty())
                     .map(|index| index[i].page_locations.clone());
-                let metadata = self.metadata.row_group(self.row_group_idx);
+                let column_metadata = 
self.metadata.row_group(self.row_group_idx).column(i);
                 let page_reader = SerializedPageReader::new(
                     data.clone(),
-                    metadata.column(i),
+                    column_metadata,
                     self.row_count,
                     page_locations,
                 )?;
 
+                #[cfg(feature = "encryption")]
+                let crypto_context = if let Some(file_decryptor) = 
self.metadata.file_decryptor() {
+                    match column_metadata.crypto_metadata() {
+                        Some(crypto_metadata) => 
Some(Arc::new(CryptoContext::for_column(
+                            file_decryptor,
+                            crypto_metadata,
+                            self.row_group_idx,
+                            i,
+                        )?)),
+                        None => None,
+                    }
+                } else {
+                    None
+                };
+
                 #[cfg(feature = "encryption")]
                 let page_reader = 
page_reader.with_crypto_context(crypto_context);
 
diff --git a/parquet/src/encryption/decrypt.rs 
b/parquet/src/encryption/decrypt.rs
index d5bfe3cfc0..7674619f4d 100644
--- a/parquet/src/encryption/decrypt.rs
+++ b/parquet/src/encryption/decrypt.rs
@@ -18,10 +18,18 @@
 use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor};
 use crate::encryption::modules::{create_module_aad, ModuleType};
 use crate::errors::{ParquetError, Result};
+use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
+use std::borrow::Cow;
 use std::collections::HashMap;
+use std::fmt::Formatter;
 use std::io::Read;
 use std::sync::Arc;
 
+/// Trait for retrieving an encryption key using the key's metadata
+pub trait KeyRetriever: Send + Sync {
+    fn retrieve_key(&self, key_metadata: &[u8]) -> Result<Vec<u8>>;
+}
+
 pub fn read_and_decrypt<T: Read>(
     decryptor: &Arc<dyn BlockDecryptor>,
     input: &mut T,
@@ -53,22 +61,45 @@ pub(crate) struct CryptoContext {
 }
 
 impl CryptoContext {
-    pub(crate) fn new(
+    pub(crate) fn for_column(
+        file_decryptor: &FileDecryptor,
+        column_crypto_metadata: &ColumnCryptoMetaData,
         row_group_idx: usize,
         column_ordinal: usize,
-        data_decryptor: Arc<dyn BlockDecryptor>,
-        metadata_decryptor: Arc<dyn BlockDecryptor>,
-        file_aad: Vec<u8>,
-    ) -> Self {
-        Self {
+    ) -> Result<Self> {
+        let (data_decryptor, metadata_decryptor) = match 
column_crypto_metadata {
+            ColumnCryptoMetaData::EncryptionWithFooterKey => {
+                // TODO: In GCM-CTR mode will this need to be a non-GCM 
decryptor?
+                let data_decryptor = file_decryptor.get_footer_decryptor()?;
+                let metadata_decryptor = 
file_decryptor.get_footer_decryptor()?;
+                (data_decryptor, metadata_decryptor)
+            }
+            
ColumnCryptoMetaData::EncryptionWithColumnKey(column_key_encryption) => {
+                let key_metadata = &column_key_encryption.key_metadata;
+                let full_column_name;
+                let column_name = if 
column_key_encryption.path_in_schema.len() == 1 {
+                    &column_key_encryption.path_in_schema[0]
+                } else {
+                    full_column_name = 
column_key_encryption.path_in_schema.join(".");
+                    &full_column_name
+                };
+                let data_decryptor = file_decryptor
+                    .get_column_data_decryptor(column_name, 
key_metadata.as_deref())?;
+                let metadata_decryptor = file_decryptor
+                    .get_column_metadata_decryptor(column_name, 
key_metadata.as_deref())?;
+                (data_decryptor, metadata_decryptor)
+            }
+        };
+
+        Ok(CryptoContext {
             row_group_idx,
             column_ordinal,
             page_ordinal: None,
             dictionary_page: false,
             data_decryptor,
             metadata_decryptor,
-            file_aad,
-        }
+            file_aad: file_decryptor.file_aad().clone(),
+        })
     }
 
     pub(crate) fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
@@ -136,49 +167,158 @@ impl CryptoContext {
     }
 }
 
-/// FileDecryptionProperties hold keys and AAD data required to decrypt a 
Parquet file.
-#[derive(Debug, Clone, PartialEq)]
-pub struct FileDecryptionProperties {
+#[derive(Clone, PartialEq)]
+struct ExplicitDecryptionKeys {
     footer_key: Vec<u8>,
     column_keys: HashMap<String, Vec<u8>>,
+}
+
+#[derive(Clone)]
+enum DecryptionKeys {
+    Explicit(ExplicitDecryptionKeys),
+    ViaRetriever(Arc<dyn KeyRetriever>),
+}
+
+impl PartialEq for DecryptionKeys {
+    fn eq(&self, other: &Self) -> bool {
+        match (self, other) {
+            (DecryptionKeys::Explicit(keys), 
DecryptionKeys::Explicit(other_keys)) => {
+                keys.footer_key == other_keys.footer_key
+                    && keys.column_keys == other_keys.column_keys
+            }
+            (DecryptionKeys::ViaRetriever(_), DecryptionKeys::ViaRetriever(_)) 
=> true,
+            _ => false,
+        }
+    }
+}
+
+/// FileDecryptionProperties hold keys and AAD data required to decrypt a 
Parquet file.
+#[derive(Clone, PartialEq)]
+pub struct FileDecryptionProperties {
+    keys: DecryptionKeys,
     pub(crate) aad_prefix: Option<Vec<u8>>,
 }
 
 impl FileDecryptionProperties {
-    /// Returns a new FileDecryptionProperties builder
+    /// Returns a new [`FileDecryptionProperties`] builder that will use the 
provided key to
+    /// decrypt footer metadata.
     pub fn builder(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
         DecryptionPropertiesBuilder::new(footer_key)
     }
+
+    /// Returns a new [`FileDecryptionProperties`] builder that uses a 
[`KeyRetriever`]
+    /// to get decryption keys based on key metadata.
+    pub fn with_key_retriever(key_retriever: Arc<dyn KeyRetriever>) -> 
DecryptionPropertiesBuilder {
+        DecryptionPropertiesBuilder::new_with_key_retriever(key_retriever)
+    }
+
+    /// Get the encryption key for decrypting a file's footer,
+    /// and also column data if uniform encryption is used.
+    pub(crate) fn footer_key(&self, key_metadata: Option<&[u8]>) -> 
Result<Cow<Vec<u8>>> {
+        match &self.keys {
+            DecryptionKeys::Explicit(keys) => 
Ok(Cow::Borrowed(&keys.footer_key)),
+            DecryptionKeys::ViaRetriever(retriever) => {
+                let key = 
retriever.retrieve_key(key_metadata.unwrap_or_default())?;
+                Ok(Cow::Owned(key))
+            }
+        }
+    }
+
+    /// Get the column-specific encryption key for decrypting column data and 
metadata within a file
+    pub(crate) fn column_key(
+        &self,
+        column_name: &str,
+        key_metadata: Option<&[u8]>,
+    ) -> Result<Cow<Vec<u8>>> {
+        match &self.keys {
+            DecryptionKeys::Explicit(keys) => match 
keys.column_keys.get(column_name) {
+                None => Err(general_err!(
+                    "No column decryption key set for column '{}'",
+                    column_name
+                )),
+                Some(key) => Ok(Cow::Borrowed(key)),
+            },
+            DecryptionKeys::ViaRetriever(retriever) => {
+                let key = 
retriever.retrieve_key(key_metadata.unwrap_or_default())?;
+                Ok(Cow::Owned(key))
+            }
+        }
+    }
+}
+
+impl std::fmt::Debug for FileDecryptionProperties {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "FileDecryptionProperties {{ }}")
+    }
 }
 
+/// Builder for [`FileDecryptionProperties`]
 pub struct DecryptionPropertiesBuilder {
-    footer_key: Vec<u8>,
+    footer_key: Option<Vec<u8>>,
+    key_retriever: Option<Arc<dyn KeyRetriever>>,
     column_keys: HashMap<String, Vec<u8>>,
     aad_prefix: Option<Vec<u8>>,
 }
 
 impl DecryptionPropertiesBuilder {
+    /// Create a new [`DecryptionPropertiesBuilder`] builder that will use the 
provided key to
+    /// decrypt footer metadata.
     pub fn new(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
         Self {
-            footer_key,
+            footer_key: Some(footer_key),
+            key_retriever: None,
             column_keys: HashMap::default(),
             aad_prefix: None,
         }
     }
 
+    /// Create a new [`DecryptionPropertiesBuilder`] by providing a 
[`KeyRetriever`] that
+    /// can be used to get decryption keys based on key metadata.
+    pub fn new_with_key_retriever(
+        key_retriever: Arc<dyn KeyRetriever>,
+    ) -> DecryptionPropertiesBuilder {
+        Self {
+            footer_key: None,
+            key_retriever: Some(key_retriever),
+            column_keys: HashMap::default(),
+            aad_prefix: None,
+        }
+    }
+
+    /// Finalize the builder and return created [`FileDecryptionProperties`]
     pub fn build(self) -> Result<FileDecryptionProperties> {
+        let keys = match (self.footer_key, self.key_retriever) {
+            (Some(footer_key), None) => 
DecryptionKeys::Explicit(ExplicitDecryptionKeys {
+                footer_key,
+                column_keys: self.column_keys,
+            }),
+            (None, Some(key_retriever)) => {
+                if !self.column_keys.is_empty() {
+                    return Err(general_err!(
+                        "Cannot specify column keys directly when using a key 
retriever"
+                    ));
+                }
+                DecryptionKeys::ViaRetriever(key_retriever)
+            }
+            _ => {
+                unreachable!()
+            }
+        };
         Ok(FileDecryptionProperties {
-            footer_key: self.footer_key,
-            column_keys: self.column_keys,
+            keys,
             aad_prefix: self.aad_prefix,
         })
     }
 
+    /// Specify the expected AAD prefix to be used for decryption.
+    /// This must be set if the file was written with an AAD prefix and the
+    /// prefix is not stored in the file metadata.
     pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
         self.aad_prefix = Some(value);
         self
     }
 
+    /// Specify the decryption key to use for a column
     pub fn with_column_key(mut self, column_name: &str, decryption_key: 
Vec<u8>) -> Self {
         self.column_keys
             .insert(column_name.to_string(), decryption_key);
@@ -189,72 +329,64 @@ impl DecryptionPropertiesBuilder {
 #[derive(Clone, Debug)]
 pub(crate) struct FileDecryptor {
     decryption_properties: FileDecryptionProperties,
-    footer_decryptor: Option<Arc<dyn BlockDecryptor>>,
+    footer_decryptor: Arc<dyn BlockDecryptor>,
     file_aad: Vec<u8>,
 }
 
 impl PartialEq for FileDecryptor {
     fn eq(&self, other: &Self) -> bool {
-        self.decryption_properties == other.decryption_properties
+        self.decryption_properties == other.decryption_properties && 
self.file_aad == other.file_aad
     }
 }
 
 impl FileDecryptor {
     pub(crate) fn new(
         decryption_properties: &FileDecryptionProperties,
+        footer_key_metadata: Option<&[u8]>,
         aad_file_unique: Vec<u8>,
         aad_prefix: Vec<u8>,
-    ) -> Result<Self, ParquetError> {
+    ) -> Result<Self> {
         let file_aad = [aad_prefix.as_slice(), 
aad_file_unique.as_slice()].concat();
-        // todo decr: if no key available yet (not set in properties, should 
be retrieved from metadata)
-        let footer_decryptor = 
RingGcmBlockDecryptor::new(&decryption_properties.footer_key)
-            .map_err(|e| {
-                general_err!(
-                    "Invalid footer key. {}",
-                    e.to_string().replace("Parquet error: ", "")
-                )
-            })?;
+        let footer_key = 
decryption_properties.footer_key(footer_key_metadata)?;
+        let footer_decryptor = 
RingGcmBlockDecryptor::new(&footer_key).map_err(|e| {
+            general_err!(
+                "Invalid footer key. {}",
+                e.to_string().replace("Parquet error: ", "")
+            )
+        })?;
+
         Ok(Self {
-            footer_decryptor: Some(Arc::new(footer_decryptor)),
+            footer_decryptor: Arc::new(footer_decryptor),
             decryption_properties: decryption_properties.clone(),
             file_aad,
         })
     }
 
-    pub(crate) fn get_footer_decryptor(&self) -> Result<Arc<dyn 
BlockDecryptor>, ParquetError> {
-        Ok(self.footer_decryptor.clone().unwrap())
+    pub(crate) fn get_footer_decryptor(&self) -> Result<Arc<dyn 
BlockDecryptor>> {
+        Ok(self.footer_decryptor.clone())
     }
 
     pub(crate) fn get_column_data_decryptor(
         &self,
         column_name: &str,
-    ) -> Result<Arc<dyn BlockDecryptor>, ParquetError> {
-        match self.decryption_properties.column_keys.get(column_name) {
-            Some(column_key) => 
Ok(Arc::new(RingGcmBlockDecryptor::new(column_key)?)),
-            None => self.get_footer_decryptor(),
-        }
+        key_metadata: Option<&[u8]>,
+    ) -> Result<Arc<dyn BlockDecryptor>> {
+        let column_key = self
+            .decryption_properties
+            .column_key(column_name, key_metadata)?;
+        Ok(Arc::new(RingGcmBlockDecryptor::new(&column_key)?))
     }
 
     pub(crate) fn get_column_metadata_decryptor(
         &self,
         column_name: &str,
-    ) -> Result<Arc<dyn BlockDecryptor>, ParquetError> {
+        key_metadata: Option<&[u8]>,
+    ) -> Result<Arc<dyn BlockDecryptor>> {
         // Once GCM CTR mode is implemented, data and metadata decryptors may 
be different
-        self.get_column_data_decryptor(column_name)
+        self.get_column_data_decryptor(column_name, key_metadata)
     }
 
     pub(crate) fn file_aad(&self) -> &Vec<u8> {
         &self.file_aad
     }
-
-    pub(crate) fn is_column_encrypted(&self, column_name: &str) -> bool {
-        // Column is encrypted if either uniform encryption is used or an 
encryption key is set for the column
-        match self.decryption_properties.column_keys.is_empty() {
-            false => self
-                .decryption_properties
-                .column_keys
-                .contains_key(column_name),
-            true => true,
-        }
-    }
 }
diff --git a/parquet/src/file/column_crypto_metadata.rs 
b/parquet/src/file/column_crypto_metadata.rs
new file mode 100644
index 0000000000..af670e675f
--- /dev/null
+++ b/parquet/src/file/column_crypto_metadata.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Column chunk encryption metadata
+
+use crate::errors::Result;
+use crate::format::{
+    ColumnCryptoMetaData as TColumnCryptoMetaData,
+    EncryptionWithColumnKey as TEncryptionWithColumnKey,
+    EncryptionWithFooterKey as TEncryptionWithFooterKey,
+};
+
+/// ColumnCryptoMetadata for a column chunk
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub enum ColumnCryptoMetaData {
+    /// The column is encrypted with the footer key
+    EncryptionWithFooterKey,
+    /// The column is encrypted with a column-specific key
+    EncryptionWithColumnKey(EncryptionWithColumnKey),
+}
+
+/// Encryption metadata for a column chunk encrypted with a column-specific key
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct EncryptionWithColumnKey {
+    /// Path to the column in the Parquet schema
+    pub path_in_schema: Vec<String>,
+    /// Metadata required to retrieve the column encryption key
+    pub key_metadata: Option<Vec<u8>>,
+}
+
+/// Converts Thrift definition into `ColumnCryptoMetadata`.
+pub fn try_from_thrift(
+    thrift_column_crypto_metadata: &TColumnCryptoMetaData,
+) -> Result<ColumnCryptoMetaData> {
+    let crypto_metadata = match thrift_column_crypto_metadata {
+        TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_) => {
+            ColumnCryptoMetaData::EncryptionWithFooterKey
+        }
+        
TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(encryption_with_column_key) => {
+            
ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
+                path_in_schema: 
encryption_with_column_key.path_in_schema.clone(),
+                key_metadata: encryption_with_column_key.key_metadata.clone(),
+            })
+        }
+    };
+    Ok(crypto_metadata)
+}
+
+/// Converts `ColumnCryptoMetadata` into Thrift definition.
+pub fn to_thrift(column_crypto_metadata: &ColumnCryptoMetaData) -> 
TColumnCryptoMetaData {
+    match column_crypto_metadata {
+        ColumnCryptoMetaData::EncryptionWithFooterKey => {
+            
TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(TEncryptionWithFooterKey {})
+        }
+        
ColumnCryptoMetaData::EncryptionWithColumnKey(encryption_with_column_key) => {
+            
TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(TEncryptionWithColumnKey {
+                path_in_schema: 
encryption_with_column_key.path_in_schema.clone(),
+                key_metadata: encryption_with_column_key.key_metadata.clone(),
+            })
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_encryption_with_footer_key_from_thrift() {
+        let metadata = ColumnCryptoMetaData::EncryptionWithFooterKey;
+
+        assert_eq!(try_from_thrift(&to_thrift(&metadata)).unwrap(), metadata);
+    }
+
+    #[test]
+    fn test_encryption_with_column_key_from_thrift() {
+        let metadata = 
ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
+            path_in_schema: vec!["abc".to_owned(), "def".to_owned()],
+            key_metadata: Some(vec![0, 1, 2, 3, 4, 5]),
+        });
+
+        assert_eq!(try_from_thrift(&to_thrift(&metadata)).unwrap(), metadata);
+    }
+}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 217685049e..81157399ac 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -102,13 +102,15 @@ use crate::encryption::{
     modules::{create_module_aad, ModuleType},
 };
 use crate::errors::{ParquetError, Result};
+#[cfg(feature = "encryption")]
+use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData};
 pub(crate) use crate::file::metadata::memory::HeapSize;
 use crate::file::page_encoding_stats::{self, PageEncodingStats};
 use crate::file::page_index::index::Index;
 use crate::file::page_index::offset_index::OffsetIndexMetaData;
 use crate::file::statistics::{self, Statistics};
 #[cfg(feature = "encryption")]
-use crate::format::ColumnCryptoMetaData;
+use crate::format::ColumnCryptoMetaData as TColumnCryptoMetaData;
 use crate::format::{
     BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, 
PageLocation, RowGroup,
     SizeStatistics, SortingColumn,
@@ -657,11 +659,14 @@ impl RowGroupMetaData {
                             d.path().string()
                         ));
                     }
-                    
Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => {
+                    
Some(TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => {
                         let column_name = 
crypto_metadata.path_in_schema.join(".");
-                        
decryptor.get_column_metadata_decryptor(column_name.as_str())?
+                        decryptor.get_column_metadata_decryptor(
+                            column_name.as_str(),
+                            crypto_metadata.key_metadata.as_deref(),
+                        )?
                     }
-                    Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
+                    Some(TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => 
{
                         decryptor.get_footer_decryptor()?
                     }
                 };
@@ -675,10 +680,13 @@ impl RowGroupMetaData {
                 )?;
 
                 let buf = c.encrypted_column_metadata.clone().unwrap();
-                let decrypted_cc_buf =
-                    column_decryptor.decrypt(buf.as_slice(), 
column_aad.as_ref()).map_err(|_| {
-                        general_err!("Unable to decrypt column '{}', perhaps 
the column key is wrong or missing?",
-                            d.path().string())
+                let decrypted_cc_buf = column_decryptor
+                    .decrypt(buf.as_slice(), column_aad.as_ref())
+                    .map_err(|_| {
+                        general_err!(
+                            "Unable to decrypt column '{}', perhaps the column 
key is wrong?",
+                            d.path().string()
+                        )
                     })?;
 
                 let mut prot = 
TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice());
@@ -854,6 +862,8 @@ pub struct ColumnChunkMetaData {
     unencoded_byte_array_data_bytes: Option<i64>,
     repetition_level_histogram: Option<LevelHistogram>,
     definition_level_histogram: Option<LevelHistogram>,
+    #[cfg(feature = "encryption")]
+    column_crypto_metadata: Option<ColumnCryptoMetaData>,
 }
 
 /// Histograms for repetition and definition levels.
@@ -1143,6 +1153,12 @@ impl ColumnChunkMetaData {
         self.definition_level_histogram.as_ref()
     }
 
+    /// Returns the encryption metadata for this column chunk.
+    #[cfg(feature = "encryption")]
+    pub fn crypto_metadata(&self) -> Option<&ColumnCryptoMetaData> {
+        self.column_crypto_metadata.as_ref()
+    }
+
     /// Method to convert from Thrift.
     pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> 
Result<Self> {
         if cc.meta_data.is_none() {
@@ -1197,6 +1213,13 @@ impl ColumnChunkMetaData {
         let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
         let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
 
+        #[cfg(feature = "encryption")]
+        let column_crypto_metadata = if let Some(crypto_metadata) = 
cc.crypto_metadata {
+            Some(column_crypto_metadata::try_from_thrift(&crypto_metadata)?)
+        } else {
+            None
+        };
+
         let result = ColumnChunkMetaData {
             column_descr,
             encodings,
@@ -1220,6 +1243,8 @@ impl ColumnChunkMetaData {
             unencoded_byte_array_data_bytes,
             repetition_level_histogram,
             definition_level_histogram,
+            #[cfg(feature = "encryption")]
+            column_crypto_metadata,
         };
         Ok(result)
     }
@@ -1343,6 +1368,8 @@ impl ColumnChunkMetaDataBuilder {
             unencoded_byte_array_data_bytes: None,
             repetition_level_histogram: None,
             definition_level_histogram: None,
+            #[cfg(feature = "encryption")]
+            column_crypto_metadata: None,
         })
     }
 
@@ -1954,7 +1981,7 @@ mod tests {
         #[cfg(not(feature = "encryption"))]
         let base_expected_size = 2312;
         #[cfg(feature = "encryption")]
-        let base_expected_size = 2448;
+        let base_expected_size = 2640;
 
         assert_eq!(parquet_meta.memory_size(), base_expected_size);
 
@@ -1984,7 +2011,7 @@ mod tests {
         #[cfg(not(feature = "encryption"))]
         let bigger_expected_size = 2816;
         #[cfg(feature = "encryption")]
-        let bigger_expected_size = 2952;
+        let bigger_expected_size = 3144;
 
         // more set fields means more memory usage
         assert!(bigger_expected_size > base_expected_size);
diff --git a/parquet/src/file/metadata/reader.rs 
b/parquet/src/file/metadata/reader.rs
index b80e76d792..8532b59667 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -734,6 +734,7 @@ impl ParquetMetaDataReader {
                         .map_err(|e| general_err!("Could not parse crypto 
metadata: {}", e))?;
                 let decryptor = get_file_decryptor(
                     t_file_crypto_metadata.encryption_algorithm,
+                    t_file_crypto_metadata.key_metadata.as_deref(),
                     file_decryption_properties,
                 )?;
                 let footer_decryptor = decryptor.get_footer_decryptor();
@@ -764,7 +765,11 @@ impl ParquetMetaDataReader {
             file_decryption_properties,
         ) {
             // File has a plaintext footer but encryption algorithm is set
-            file_decryptor = Some(get_file_decryptor(algo, 
file_decryption_properties)?);
+            file_decryptor = Some(get_file_decryptor(
+                algo,
+                t_file_metadata.footer_signing_key_metadata.as_deref(),
+                file_decryption_properties,
+            )?);
         }
 
         let mut row_groups = Vec::new();
@@ -863,6 +868,7 @@ impl ParquetMetaDataReader {
 #[cfg(feature = "encryption")]
 fn get_file_decryptor(
     encryption_algorithm: EncryptionAlgorithm,
+    footer_key_metadata: Option<&[u8]>,
     file_decryption_properties: &FileDecryptionProperties,
 ) -> Result<FileDecryptor> {
     match encryption_algorithm {
@@ -876,7 +882,12 @@ fn get_file_decryptor(
                 algo.aad_prefix.unwrap_or_default()
             };
 
-            FileDecryptor::new(file_decryption_properties, aad_file_unique, 
aad_prefix)
+            FileDecryptor::new(
+                file_decryption_properties,
+                footer_key_metadata,
+                aad_file_unique,
+                aad_prefix,
+            )
         }
         EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
             "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs
index b36ef752ae..94eeb2b22e 100644
--- a/parquet/src/file/mod.rs
+++ b/parquet/src/file/mod.rs
@@ -97,6 +97,8 @@
 //!     println!("{}", row.unwrap());
 //! }
 //! ```
+#[cfg(feature = "encryption")]
+pub mod column_crypto_metadata;
 pub mod footer;
 pub mod metadata;
 pub mod page_encoding_stats;
diff --git a/parquet/tests/arrow_reader/encryption.rs 
b/parquet/tests/arrow_reader/encryption.rs
index 1d633f7991..521212488a 100644
--- a/parquet/tests/arrow_reader/encryption.rs
+++ b/parquet/tests/arrow_reader/encryption.rs
@@ -17,13 +17,14 @@
 
 //! This module contains tests for reading encrypted Parquet files with the 
Arrow API
 
-use crate::encryption_util::verify_encryption_test_data;
+use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
 use arrow_array::RecordBatch;
 use parquet::arrow::arrow_reader::{
     ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
 };
 use parquet::encryption::decrypt::FileDecryptionProperties;
 use std::fs::File;
+use std::sync::Arc;
 
 #[test]
 fn test_non_uniform_encryption_plaintext_footer() {
@@ -187,6 +188,61 @@ fn test_aes_ctr_encryption() {
     };
 }
 
+#[test]
+fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever = TestKeyRetriever::new()
+        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_non_uniform_encryption_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever = TestKeyRetriever::new()
+        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_uniform_encryption_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever =
+        TestKeyRetriever::new().with_key("kf".to_owned(), 
"0123456789012345".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
 fn verify_encryption_test_file_read(file: File, decryption_properties: 
FileDecryptionProperties) {
     let options =
         
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
diff --git a/parquet/tests/arrow_reader/encryption_async.rs 
b/parquet/tests/arrow_reader/encryption_async.rs
index 01ed0a9fc9..eeac10f574 100644
--- a/parquet/tests/arrow_reader/encryption_async.rs
+++ b/parquet/tests/arrow_reader/encryption_async.rs
@@ -17,12 +17,13 @@
 
 //! This module contains tests for reading encrypted Parquet files with the 
async Arrow API
 
-use crate::encryption_util::verify_encryption_test_data;
+use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
 use futures::TryStreamExt;
 use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
 use parquet::arrow::ParquetRecordBatchStreamBuilder;
 use parquet::encryption::decrypt::FileDecryptionProperties;
 use parquet::errors::ParquetError;
+use std::sync::Arc;
 use tokio::fs::File;
 
 #[tokio::test]
@@ -114,8 +115,14 @@ async fn test_misspecified_encryption_keys() {
     .await;
 
     // Missing column key
-    check_for_error("Parquet error: Unable to decrypt column 'double_field', 
perhaps the column key is wrong or missing?",
-                    &path, footer_key, "".as_bytes(), column_2_key).await;
+    check_for_error(
+        "Parquet error: No column decryption key set for column 
'double_field'",
+        &path,
+        footer_key,
+        "".as_bytes(),
+        column_2_key,
+    )
+    .await;
 
     // Too short column key
     check_for_error(
@@ -128,12 +135,24 @@ async fn test_misspecified_encryption_keys() {
     .await;
 
     // Wrong column key
-    check_for_error("Parquet error: Unable to decrypt column 'double_field', 
perhaps the column key is wrong or missing?",
-                    &path, footer_key, "1123456789012345".as_bytes(), 
column_2_key).await;
+    check_for_error(
+        "Parquet error: Unable to decrypt column 'double_field', perhaps the 
column key is wrong?",
+        &path,
+        footer_key,
+        "1123456789012345".as_bytes(),
+        column_2_key,
+    )
+    .await;
 
     // Mixed up keys
-    check_for_error("Parquet error: Unable to decrypt column 'float_field', 
perhaps the column key is wrong or missing?",
-                    &path, footer_key, column_2_key, column_1_key).await;
+    check_for_error(
+        "Parquet error: Unable to decrypt column 'float_field', perhaps the 
column key is wrong?",
+        &path,
+        footer_key,
+        column_2_key,
+        column_1_key,
+    )
+    .await;
 }
 
 #[tokio::test]
@@ -268,6 +287,67 @@ async fn test_read_encrypted_file_from_object_store() {
     verify_encryption_test_data(record_batches, &metadata);
 }
 
+#[tokio::test]
+async fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() {
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
+    let mut file = File::open(&path).await.unwrap();
+
+    let key_retriever = TestKeyRetriever::new()
+        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read_async(&mut file, decryption_properties)
+        .await
+        .unwrap();
+}
+
+#[tokio::test]
+async fn test_non_uniform_encryption_with_key_retriever() {
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+    let mut file = File::open(&path).await.unwrap();
+
+    let key_retriever = TestKeyRetriever::new()
+        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read_async(&mut file, decryption_properties)
+        .await
+        .unwrap();
+}
+
+#[tokio::test]
+async fn test_uniform_encryption_with_key_retriever() {
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
+    let mut file = File::open(&path).await.unwrap();
+
+    let key_retriever =
+        TestKeyRetriever::new().with_key("kf".to_owned(), 
"0123456789012345".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read_async(&mut file, decryption_properties)
+        .await
+        .unwrap();
+}
+
 async fn verify_encryption_test_file_read_async(
     file: &mut tokio::fs::File,
     decryption_properties: FileDecryptionProperties,
diff --git a/parquet/tests/arrow_reader/encryption_util.rs 
b/parquet/tests/arrow_reader/encryption_util.rs
index de21f13ca6..f0d1152fb0 100644
--- a/parquet/tests/arrow_reader/encryption_util.rs
+++ b/parquet/tests/arrow_reader/encryption_util.rs
@@ -17,7 +17,11 @@
 
 use arrow_array::cast::AsArray;
 use arrow_array::{types, RecordBatch};
+use parquet::encryption::decrypt::KeyRetriever;
+use parquet::errors::{ParquetError, Result};
 use parquet::file::metadata::ParquetMetaData;
+use std::collections::HashMap;
+use std::sync::Mutex;
 
 /// Verifies data read from an encrypted file from the parquet-testing 
repository
 pub fn verify_encryption_test_data(record_batches: Vec<RecordBatch>, metadata: 
&ParquetMetaData) {
@@ -83,3 +87,41 @@ pub fn verify_encryption_test_data(record_batches: 
Vec<RecordBatch>, metadata: &
 
     assert_eq!(row_count, file_metadata.num_rows() as usize);
 }
+
+/// A KeyRetriever to use in Parquet encryption tests,
+/// which stores a map from key names/metadata to encryption key bytes.
+pub struct TestKeyRetriever {
+    keys: Mutex<HashMap<String, Vec<u8>>>,
+}
+
+impl TestKeyRetriever {
+    pub fn new() -> Self {
+        Self {
+            keys: Mutex::new(HashMap::default()),
+        }
+    }
+
+    pub fn with_key(self, key_name: String, key: Vec<u8>) -> Self {
+        {
+            let mut keys = self.keys.lock().unwrap();
+            keys.insert(key_name, key);
+        }
+        self
+    }
+}
+
+impl KeyRetriever for TestKeyRetriever {
+    fn retrieve_key(&self, key_metadata: &[u8]) -> Result<Vec<u8>> {
+        let key_metadata = std::str::from_utf8(key_metadata).map_err(|e| {
+            ParquetError::General(format!("Could not convert key metadata to 
string: {}", e))
+        })?;
+        let keys = self.keys.lock().unwrap();
+        match keys.get(key_metadata) {
+            Some(key) => Ok(key.clone()),
+            None => Err(ParquetError::General(format!(
+                "Could not retrieve key for metadata {:?}",
+                key_metadata
+            ))),
+        }
+    }
+}


Reply via email to