This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 329f6c99c6 Allow retrieving Parquet decryption keys using the key
metadata (#7286)
329f6c99c6 is described below
commit 329f6c99c65b0a60c7f5cf5e8219ce0c4dda62a1
Author: Adam Reeve <[email protected]>
AuthorDate: Tue Mar 25 06:19:14 2025 +1300
Allow retrieving Parquet decryption keys using the key metadata (#7286)
* Allow retrieving decryption keys from key metadata
* Refactor to disallow setting keys and a key retriever
* Remove unnecessary clone
* Implement PartialEq explicitly for DecryptionKeys rather than
FileDecryptionProperties
* Add key retrieval methods to FileDecryptionProperties
* Get column path from ColumnCryptoMetaData
* Tidy up
* Revert rename of parameter
---
parquet/src/arrow/arrow_reader/mod.rs | 43 ++---
parquet/src/arrow/async_reader/mod.rs | 49 ++----
parquet/src/encryption/decrypt.rs | 228 +++++++++++++++++++------
parquet/src/file/column_crypto_metadata.rs | 98 +++++++++++
parquet/src/file/metadata/mod.rs | 47 +++--
parquet/src/file/metadata/reader.rs | 15 +-
parquet/src/file/mod.rs | 2 +
parquet/tests/arrow_reader/encryption.rs | 58 ++++++-
parquet/tests/arrow_reader/encryption_async.rs | 94 +++++++++-
parquet/tests/arrow_reader/encryption_util.rs | 42 +++++
10 files changed, 546 insertions(+), 130 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 588a8ea2fa..9514e868b1 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -700,36 +700,19 @@ impl<T: ChunkReader + 'static> Iterator for
ReaderPageIterator<T> {
#[cfg(feature = "encryption")]
let crypto_context = if let Some(file_decryptor) =
self.metadata.file_decryptor() {
- let column_name = self
- .metadata
- .file_metadata()
- .schema_descr()
- .column(self.column_idx);
-
- if file_decryptor.is_column_encrypted(column_name.name()) {
- let data_decryptor =
file_decryptor.get_column_data_decryptor(column_name.name());
- let data_decryptor = match data_decryptor {
- Ok(data_decryptor) => data_decryptor,
- Err(err) => return Some(Err(err)),
- };
-
- let metadata_decryptor =
-
file_decryptor.get_column_metadata_decryptor(column_name.name());
- let metadata_decryptor = match metadata_decryptor {
- Ok(metadata_decryptor) => metadata_decryptor,
- Err(err) => return Some(Err(err)),
- };
-
- let crypto_context = CryptoContext::new(
- rg_idx,
- self.column_idx,
- data_decryptor,
- metadata_decryptor,
- file_decryptor.file_aad().clone(),
- );
- Some(Arc::new(crypto_context))
- } else {
- None
+ match meta.crypto_metadata() {
+ Some(crypto_metadata) => {
+ match CryptoContext::for_column(
+ file_decryptor,
+ crypto_metadata,
+ rg_idx,
+ self.column_idx,
+ ) {
+ Ok(context) => Some(Arc::new(context)),
+ Err(err) => return Some(Err(err)),
+ }
+ }
+ None => None,
}
} else {
None
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index fd49ad2293..f9308dc4dc 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -1027,36 +1027,6 @@ impl RowGroups for InMemoryRowGroup<'_> {
}
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
- #[cfg(feature = "encryption")]
- let crypto_context = if let Some(file_decryptor) =
self.metadata.clone().file_decryptor() {
- let column_name = &self
- .metadata
- .clone()
- .file_metadata()
- .schema_descr()
- .column(i);
-
- if file_decryptor.is_column_encrypted(column_name.name()) {
- let data_decryptor =
-
file_decryptor.get_column_data_decryptor(column_name.name())?;
- let metadata_decryptor =
-
file_decryptor.get_column_metadata_decryptor(column_name.name())?;
-
- let crypto_context = CryptoContext::new(
- self.row_group_idx,
- i,
- data_decryptor,
- metadata_decryptor,
- file_decryptor.file_aad().clone(),
- );
- Some(Arc::new(crypto_context))
- } else {
- None
- }
- } else {
- None
- };
-
match &self.column_chunks[i] {
None => Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
@@ -1067,14 +1037,29 @@ impl RowGroups for InMemoryRowGroup<'_> {
// filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|index| index[i].page_locations.clone());
- let metadata = self.metadata.row_group(self.row_group_idx);
+ let column_metadata =
self.metadata.row_group(self.row_group_idx).column(i);
let page_reader = SerializedPageReader::new(
data.clone(),
- metadata.column(i),
+ column_metadata,
self.row_count,
page_locations,
)?;
+ #[cfg(feature = "encryption")]
+ let crypto_context = if let Some(file_decryptor) =
self.metadata.file_decryptor() {
+ match column_metadata.crypto_metadata() {
+ Some(crypto_metadata) =>
Some(Arc::new(CryptoContext::for_column(
+ file_decryptor,
+ crypto_metadata,
+ self.row_group_idx,
+ i,
+ )?)),
+ None => None,
+ }
+ } else {
+ None
+ };
+
#[cfg(feature = "encryption")]
let page_reader =
page_reader.with_crypto_context(crypto_context);
diff --git a/parquet/src/encryption/decrypt.rs
b/parquet/src/encryption/decrypt.rs
index d5bfe3cfc0..7674619f4d 100644
--- a/parquet/src/encryption/decrypt.rs
+++ b/parquet/src/encryption/decrypt.rs
@@ -18,10 +18,18 @@
use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor};
use crate::encryption::modules::{create_module_aad, ModuleType};
use crate::errors::{ParquetError, Result};
+use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
+use std::borrow::Cow;
use std::collections::HashMap;
+use std::fmt::Formatter;
use std::io::Read;
use std::sync::Arc;
+/// Trait for retrieving an encryption key using the key's metadata
+pub trait KeyRetriever: Send + Sync {
+ fn retrieve_key(&self, key_metadata: &[u8]) -> Result<Vec<u8>>;
+}
+
pub fn read_and_decrypt<T: Read>(
decryptor: &Arc<dyn BlockDecryptor>,
input: &mut T,
@@ -53,22 +61,45 @@ pub(crate) struct CryptoContext {
}
impl CryptoContext {
- pub(crate) fn new(
+ pub(crate) fn for_column(
+ file_decryptor: &FileDecryptor,
+ column_crypto_metadata: &ColumnCryptoMetaData,
row_group_idx: usize,
column_ordinal: usize,
- data_decryptor: Arc<dyn BlockDecryptor>,
- metadata_decryptor: Arc<dyn BlockDecryptor>,
- file_aad: Vec<u8>,
- ) -> Self {
- Self {
+ ) -> Result<Self> {
+ let (data_decryptor, metadata_decryptor) = match
column_crypto_metadata {
+ ColumnCryptoMetaData::EncryptionWithFooterKey => {
+ // TODO: In GCM-CTR mode will this need to be a non-GCM
decryptor?
+ let data_decryptor = file_decryptor.get_footer_decryptor()?;
+ let metadata_decryptor =
file_decryptor.get_footer_decryptor()?;
+ (data_decryptor, metadata_decryptor)
+ }
+
ColumnCryptoMetaData::EncryptionWithColumnKey(column_key_encryption) => {
+ let key_metadata = &column_key_encryption.key_metadata;
+ let full_column_name;
+ let column_name = if
column_key_encryption.path_in_schema.len() == 1 {
+ &column_key_encryption.path_in_schema[0]
+ } else {
+ full_column_name =
column_key_encryption.path_in_schema.join(".");
+ &full_column_name
+ };
+ let data_decryptor = file_decryptor
+ .get_column_data_decryptor(column_name,
key_metadata.as_deref())?;
+ let metadata_decryptor = file_decryptor
+ .get_column_metadata_decryptor(column_name,
key_metadata.as_deref())?;
+ (data_decryptor, metadata_decryptor)
+ }
+ };
+
+ Ok(CryptoContext {
row_group_idx,
column_ordinal,
page_ordinal: None,
dictionary_page: false,
data_decryptor,
metadata_decryptor,
- file_aad,
- }
+ file_aad: file_decryptor.file_aad().clone(),
+ })
}
pub(crate) fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
@@ -136,49 +167,158 @@ impl CryptoContext {
}
}
-/// FileDecryptionProperties hold keys and AAD data required to decrypt a
Parquet file.
-#[derive(Debug, Clone, PartialEq)]
-pub struct FileDecryptionProperties {
+#[derive(Clone, PartialEq)]
+struct ExplicitDecryptionKeys {
footer_key: Vec<u8>,
column_keys: HashMap<String, Vec<u8>>,
+}
+
+#[derive(Clone)]
+enum DecryptionKeys {
+ Explicit(ExplicitDecryptionKeys),
+ ViaRetriever(Arc<dyn KeyRetriever>),
+}
+
+impl PartialEq for DecryptionKeys {
+ fn eq(&self, other: &Self) -> bool {
+ match (self, other) {
+ (DecryptionKeys::Explicit(keys),
DecryptionKeys::Explicit(other_keys)) => {
+ keys.footer_key == other_keys.footer_key
+ && keys.column_keys == other_keys.column_keys
+ }
+ (DecryptionKeys::ViaRetriever(_), DecryptionKeys::ViaRetriever(_))
=> true,
+ _ => false,
+ }
+ }
+}
+
+/// FileDecryptionProperties hold keys and AAD data required to decrypt a
Parquet file.
+#[derive(Clone, PartialEq)]
+pub struct FileDecryptionProperties {
+ keys: DecryptionKeys,
pub(crate) aad_prefix: Option<Vec<u8>>,
}
impl FileDecryptionProperties {
- /// Returns a new FileDecryptionProperties builder
+ /// Returns a new [`FileDecryptionProperties`] builder that will use the
provided key to
+ /// decrypt footer metadata.
pub fn builder(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
DecryptionPropertiesBuilder::new(footer_key)
}
+
+ /// Returns a new [`FileDecryptionProperties`] builder that uses a
[`KeyRetriever`]
+ /// to get decryption keys based on key metadata.
+ pub fn with_key_retriever(key_retriever: Arc<dyn KeyRetriever>) ->
DecryptionPropertiesBuilder {
+ DecryptionPropertiesBuilder::new_with_key_retriever(key_retriever)
+ }
+
+ /// Get the encryption key for decrypting a file's footer,
+ /// and also column data if uniform encryption is used.
+ pub(crate) fn footer_key(&self, key_metadata: Option<&[u8]>) ->
Result<Cow<Vec<u8>>> {
+ match &self.keys {
+ DecryptionKeys::Explicit(keys) =>
Ok(Cow::Borrowed(&keys.footer_key)),
+ DecryptionKeys::ViaRetriever(retriever) => {
+ let key =
retriever.retrieve_key(key_metadata.unwrap_or_default())?;
+ Ok(Cow::Owned(key))
+ }
+ }
+ }
+
+ /// Get the column-specific encryption key for decrypting column data and
metadata within a file
+ pub(crate) fn column_key(
+ &self,
+ column_name: &str,
+ key_metadata: Option<&[u8]>,
+ ) -> Result<Cow<Vec<u8>>> {
+ match &self.keys {
+ DecryptionKeys::Explicit(keys) => match
keys.column_keys.get(column_name) {
+ None => Err(general_err!(
+ "No column decryption key set for column '{}'",
+ column_name
+ )),
+ Some(key) => Ok(Cow::Borrowed(key)),
+ },
+ DecryptionKeys::ViaRetriever(retriever) => {
+ let key =
retriever.retrieve_key(key_metadata.unwrap_or_default())?;
+ Ok(Cow::Owned(key))
+ }
+ }
+ }
+}
+
+impl std::fmt::Debug for FileDecryptionProperties {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "FileDecryptionProperties {{ }}")
+ }
}
+/// Builder for [`FileDecryptionProperties`]
pub struct DecryptionPropertiesBuilder {
- footer_key: Vec<u8>,
+ footer_key: Option<Vec<u8>>,
+ key_retriever: Option<Arc<dyn KeyRetriever>>,
column_keys: HashMap<String, Vec<u8>>,
aad_prefix: Option<Vec<u8>>,
}
impl DecryptionPropertiesBuilder {
+ /// Create a new [`DecryptionPropertiesBuilder`] builder that will use the
provided key to
+ /// decrypt footer metadata.
pub fn new(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
Self {
- footer_key,
+ footer_key: Some(footer_key),
+ key_retriever: None,
column_keys: HashMap::default(),
aad_prefix: None,
}
}
+ /// Create a new [`DecryptionPropertiesBuilder`] by providing a
[`KeyRetriever`] that
+ /// can be used to get decryption keys based on key metadata.
+ pub fn new_with_key_retriever(
+ key_retriever: Arc<dyn KeyRetriever>,
+ ) -> DecryptionPropertiesBuilder {
+ Self {
+ footer_key: None,
+ key_retriever: Some(key_retriever),
+ column_keys: HashMap::default(),
+ aad_prefix: None,
+ }
+ }
+
+ /// Finalize the builder and return created [`FileDecryptionProperties`]
pub fn build(self) -> Result<FileDecryptionProperties> {
+ let keys = match (self.footer_key, self.key_retriever) {
+ (Some(footer_key), None) =>
DecryptionKeys::Explicit(ExplicitDecryptionKeys {
+ footer_key,
+ column_keys: self.column_keys,
+ }),
+ (None, Some(key_retriever)) => {
+ if !self.column_keys.is_empty() {
+ return Err(general_err!(
+ "Cannot specify column keys directly when using a key
retriever"
+ ));
+ }
+ DecryptionKeys::ViaRetriever(key_retriever)
+ }
+ _ => {
+ unreachable!()
+ }
+ };
Ok(FileDecryptionProperties {
- footer_key: self.footer_key,
- column_keys: self.column_keys,
+ keys,
aad_prefix: self.aad_prefix,
})
}
+ /// Specify the expected AAD prefix to be used for decryption.
+ /// This must be set if the file was written with an AAD prefix and the
+ /// prefix is not stored in the file metadata.
pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
self.aad_prefix = Some(value);
self
}
+ /// Specify the decryption key to use for a column
pub fn with_column_key(mut self, column_name: &str, decryption_key:
Vec<u8>) -> Self {
self.column_keys
.insert(column_name.to_string(), decryption_key);
@@ -189,72 +329,64 @@ impl DecryptionPropertiesBuilder {
#[derive(Clone, Debug)]
pub(crate) struct FileDecryptor {
decryption_properties: FileDecryptionProperties,
- footer_decryptor: Option<Arc<dyn BlockDecryptor>>,
+ footer_decryptor: Arc<dyn BlockDecryptor>,
file_aad: Vec<u8>,
}
impl PartialEq for FileDecryptor {
fn eq(&self, other: &Self) -> bool {
- self.decryption_properties == other.decryption_properties
+ self.decryption_properties == other.decryption_properties &&
self.file_aad == other.file_aad
}
}
impl FileDecryptor {
pub(crate) fn new(
decryption_properties: &FileDecryptionProperties,
+ footer_key_metadata: Option<&[u8]>,
aad_file_unique: Vec<u8>,
aad_prefix: Vec<u8>,
- ) -> Result<Self, ParquetError> {
+ ) -> Result<Self> {
let file_aad = [aad_prefix.as_slice(),
aad_file_unique.as_slice()].concat();
- // todo decr: if no key available yet (not set in properties, should
be retrieved from metadata)
- let footer_decryptor =
RingGcmBlockDecryptor::new(&decryption_properties.footer_key)
- .map_err(|e| {
- general_err!(
- "Invalid footer key. {}",
- e.to_string().replace("Parquet error: ", "")
- )
- })?;
+ let footer_key =
decryption_properties.footer_key(footer_key_metadata)?;
+ let footer_decryptor =
RingGcmBlockDecryptor::new(&footer_key).map_err(|e| {
+ general_err!(
+ "Invalid footer key. {}",
+ e.to_string().replace("Parquet error: ", "")
+ )
+ })?;
+
Ok(Self {
- footer_decryptor: Some(Arc::new(footer_decryptor)),
+ footer_decryptor: Arc::new(footer_decryptor),
decryption_properties: decryption_properties.clone(),
file_aad,
})
}
- pub(crate) fn get_footer_decryptor(&self) -> Result<Arc<dyn
BlockDecryptor>, ParquetError> {
- Ok(self.footer_decryptor.clone().unwrap())
+ pub(crate) fn get_footer_decryptor(&self) -> Result<Arc<dyn
BlockDecryptor>> {
+ Ok(self.footer_decryptor.clone())
}
pub(crate) fn get_column_data_decryptor(
&self,
column_name: &str,
- ) -> Result<Arc<dyn BlockDecryptor>, ParquetError> {
- match self.decryption_properties.column_keys.get(column_name) {
- Some(column_key) =>
Ok(Arc::new(RingGcmBlockDecryptor::new(column_key)?)),
- None => self.get_footer_decryptor(),
- }
+ key_metadata: Option<&[u8]>,
+ ) -> Result<Arc<dyn BlockDecryptor>> {
+ let column_key = self
+ .decryption_properties
+ .column_key(column_name, key_metadata)?;
+ Ok(Arc::new(RingGcmBlockDecryptor::new(&column_key)?))
}
pub(crate) fn get_column_metadata_decryptor(
&self,
column_name: &str,
- ) -> Result<Arc<dyn BlockDecryptor>, ParquetError> {
+ key_metadata: Option<&[u8]>,
+ ) -> Result<Arc<dyn BlockDecryptor>> {
// Once GCM CTR mode is implemented, data and metadata decryptors may
be different
- self.get_column_data_decryptor(column_name)
+ self.get_column_data_decryptor(column_name, key_metadata)
}
pub(crate) fn file_aad(&self) -> &Vec<u8> {
&self.file_aad
}
-
- pub(crate) fn is_column_encrypted(&self, column_name: &str) -> bool {
- // Column is encrypted if either uniform encryption is used or an
encryption key is set for the column
- match self.decryption_properties.column_keys.is_empty() {
- false => self
- .decryption_properties
- .column_keys
- .contains_key(column_name),
- true => true,
- }
- }
}
diff --git a/parquet/src/file/column_crypto_metadata.rs
b/parquet/src/file/column_crypto_metadata.rs
new file mode 100644
index 0000000000..af670e675f
--- /dev/null
+++ b/parquet/src/file/column_crypto_metadata.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Column chunk encryption metadata
+
+use crate::errors::Result;
+use crate::format::{
+ ColumnCryptoMetaData as TColumnCryptoMetaData,
+ EncryptionWithColumnKey as TEncryptionWithColumnKey,
+ EncryptionWithFooterKey as TEncryptionWithFooterKey,
+};
+
+/// ColumnCryptoMetadata for a column chunk
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub enum ColumnCryptoMetaData {
+ /// The column is encrypted with the footer key
+ EncryptionWithFooterKey,
+ /// The column is encrypted with a column-specific key
+ EncryptionWithColumnKey(EncryptionWithColumnKey),
+}
+
+/// Encryption metadata for a column chunk encrypted with a column-specific key
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct EncryptionWithColumnKey {
+ /// Path to the column in the Parquet schema
+ pub path_in_schema: Vec<String>,
+ /// Metadata required to retrieve the column encryption key
+ pub key_metadata: Option<Vec<u8>>,
+}
+
+/// Converts Thrift definition into `ColumnCryptoMetadata`.
+pub fn try_from_thrift(
+ thrift_column_crypto_metadata: &TColumnCryptoMetaData,
+) -> Result<ColumnCryptoMetaData> {
+ let crypto_metadata = match thrift_column_crypto_metadata {
+ TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_) => {
+ ColumnCryptoMetaData::EncryptionWithFooterKey
+ }
+
TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(encryption_with_column_key) => {
+
ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
+ path_in_schema:
encryption_with_column_key.path_in_schema.clone(),
+ key_metadata: encryption_with_column_key.key_metadata.clone(),
+ })
+ }
+ };
+ Ok(crypto_metadata)
+}
+
+/// Converts `ColumnCryptoMetadata` into Thrift definition.
+pub fn to_thrift(column_crypto_metadata: &ColumnCryptoMetaData) ->
TColumnCryptoMetaData {
+ match column_crypto_metadata {
+ ColumnCryptoMetaData::EncryptionWithFooterKey => {
+
TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(TEncryptionWithFooterKey {})
+ }
+
ColumnCryptoMetaData::EncryptionWithColumnKey(encryption_with_column_key) => {
+
TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(TEncryptionWithColumnKey {
+ path_in_schema:
encryption_with_column_key.path_in_schema.clone(),
+ key_metadata: encryption_with_column_key.key_metadata.clone(),
+ })
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_encryption_with_footer_key_from_thrift() {
+ let metadata = ColumnCryptoMetaData::EncryptionWithFooterKey;
+
+ assert_eq!(try_from_thrift(&to_thrift(&metadata)).unwrap(), metadata);
+ }
+
+ #[test]
+ fn test_encryption_with_column_key_from_thrift() {
+ let metadata =
ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
+ path_in_schema: vec!["abc".to_owned(), "def".to_owned()],
+ key_metadata: Some(vec![0, 1, 2, 3, 4, 5]),
+ });
+
+ assert_eq!(try_from_thrift(&to_thrift(&metadata)).unwrap(), metadata);
+ }
+}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 217685049e..81157399ac 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -102,13 +102,15 @@ use crate::encryption::{
modules::{create_module_aad, ModuleType},
};
use crate::errors::{ParquetError, Result};
+#[cfg(feature = "encryption")]
+use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData};
pub(crate) use crate::file::metadata::memory::HeapSize;
use crate::file::page_encoding_stats::{self, PageEncodingStats};
use crate::file::page_index::index::Index;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::statistics::{self, Statistics};
#[cfg(feature = "encryption")]
-use crate::format::ColumnCryptoMetaData;
+use crate::format::ColumnCryptoMetaData as TColumnCryptoMetaData;
use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex,
PageLocation, RowGroup,
SizeStatistics, SortingColumn,
@@ -657,11 +659,14 @@ impl RowGroupMetaData {
d.path().string()
));
}
-
Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => {
+
Some(TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => {
let column_name =
crypto_metadata.path_in_schema.join(".");
-
decryptor.get_column_metadata_decryptor(column_name.as_str())?
+ decryptor.get_column_metadata_decryptor(
+ column_name.as_str(),
+ crypto_metadata.key_metadata.as_deref(),
+ )?
}
- Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
+ Some(TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) =>
{
decryptor.get_footer_decryptor()?
}
};
@@ -675,10 +680,13 @@ impl RowGroupMetaData {
)?;
let buf = c.encrypted_column_metadata.clone().unwrap();
- let decrypted_cc_buf =
- column_decryptor.decrypt(buf.as_slice(),
column_aad.as_ref()).map_err(|_| {
- general_err!("Unable to decrypt column '{}', perhaps
the column key is wrong or missing?",
- d.path().string())
+ let decrypted_cc_buf = column_decryptor
+ .decrypt(buf.as_slice(), column_aad.as_ref())
+ .map_err(|_| {
+ general_err!(
+ "Unable to decrypt column '{}', perhaps the column
key is wrong?",
+ d.path().string()
+ )
})?;
let mut prot =
TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice());
@@ -854,6 +862,8 @@ pub struct ColumnChunkMetaData {
unencoded_byte_array_data_bytes: Option<i64>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
+ #[cfg(feature = "encryption")]
+ column_crypto_metadata: Option<ColumnCryptoMetaData>,
}
/// Histograms for repetition and definition levels.
@@ -1143,6 +1153,12 @@ impl ColumnChunkMetaData {
self.definition_level_histogram.as_ref()
}
+ /// Returns the encryption metadata for this column chunk.
+ #[cfg(feature = "encryption")]
+ pub fn crypto_metadata(&self) -> Option<&ColumnCryptoMetaData> {
+ self.column_crypto_metadata.as_ref()
+ }
+
/// Method to convert from Thrift.
pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) ->
Result<Self> {
if cc.meta_data.is_none() {
@@ -1197,6 +1213,13 @@ impl ColumnChunkMetaData {
let repetition_level_histogram =
repetition_level_histogram.map(LevelHistogram::from);
let definition_level_histogram =
definition_level_histogram.map(LevelHistogram::from);
+ #[cfg(feature = "encryption")]
+ let column_crypto_metadata = if let Some(crypto_metadata) =
cc.crypto_metadata {
+ Some(column_crypto_metadata::try_from_thrift(&crypto_metadata)?)
+ } else {
+ None
+ };
+
let result = ColumnChunkMetaData {
column_descr,
encodings,
@@ -1220,6 +1243,8 @@ impl ColumnChunkMetaData {
unencoded_byte_array_data_bytes,
repetition_level_histogram,
definition_level_histogram,
+ #[cfg(feature = "encryption")]
+ column_crypto_metadata,
};
Ok(result)
}
@@ -1343,6 +1368,8 @@ impl ColumnChunkMetaDataBuilder {
unencoded_byte_array_data_bytes: None,
repetition_level_histogram: None,
definition_level_histogram: None,
+ #[cfg(feature = "encryption")]
+ column_crypto_metadata: None,
})
}
@@ -1954,7 +1981,7 @@ mod tests {
#[cfg(not(feature = "encryption"))]
let base_expected_size = 2312;
#[cfg(feature = "encryption")]
- let base_expected_size = 2448;
+ let base_expected_size = 2640;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
@@ -1984,7 +2011,7 @@ mod tests {
#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 2816;
#[cfg(feature = "encryption")]
- let bigger_expected_size = 2952;
+ let bigger_expected_size = 3144;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index b80e76d792..8532b59667 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -734,6 +734,7 @@ impl ParquetMetaDataReader {
.map_err(|e| general_err!("Could not parse crypto
metadata: {}", e))?;
let decryptor = get_file_decryptor(
t_file_crypto_metadata.encryption_algorithm,
+ t_file_crypto_metadata.key_metadata.as_deref(),
file_decryption_properties,
)?;
let footer_decryptor = decryptor.get_footer_decryptor();
@@ -764,7 +765,11 @@ impl ParquetMetaDataReader {
file_decryption_properties,
) {
// File has a plaintext footer but encryption algorithm is set
- file_decryptor = Some(get_file_decryptor(algo,
file_decryption_properties)?);
+ file_decryptor = Some(get_file_decryptor(
+ algo,
+ t_file_metadata.footer_signing_key_metadata.as_deref(),
+ file_decryption_properties,
+ )?);
}
let mut row_groups = Vec::new();
@@ -863,6 +868,7 @@ impl ParquetMetaDataReader {
#[cfg(feature = "encryption")]
fn get_file_decryptor(
encryption_algorithm: EncryptionAlgorithm,
+ footer_key_metadata: Option<&[u8]>,
file_decryption_properties: &FileDecryptionProperties,
) -> Result<FileDecryptor> {
match encryption_algorithm {
@@ -876,7 +882,12 @@ fn get_file_decryptor(
algo.aad_prefix.unwrap_or_default()
};
- FileDecryptor::new(file_decryption_properties, aad_file_unique,
aad_prefix)
+ FileDecryptor::new(
+ file_decryption_properties,
+ footer_key_metadata,
+ aad_file_unique,
+ aad_prefix,
+ )
}
EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
"The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs
index b36ef752ae..94eeb2b22e 100644
--- a/parquet/src/file/mod.rs
+++ b/parquet/src/file/mod.rs
@@ -97,6 +97,8 @@
//! println!("{}", row.unwrap());
//! }
//! ```
+#[cfg(feature = "encryption")]
+pub mod column_crypto_metadata;
pub mod footer;
pub mod metadata;
pub mod page_encoding_stats;
diff --git a/parquet/tests/arrow_reader/encryption.rs
b/parquet/tests/arrow_reader/encryption.rs
index 1d633f7991..521212488a 100644
--- a/parquet/tests/arrow_reader/encryption.rs
+++ b/parquet/tests/arrow_reader/encryption.rs
@@ -17,13 +17,14 @@
//! This module contains tests for reading encrypted Parquet files with the
Arrow API
-use crate::encryption_util::verify_encryption_test_data;
+use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
use arrow_array::RecordBatch;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::encryption::decrypt::FileDecryptionProperties;
use std::fs::File;
+use std::sync::Arc;
#[test]
fn test_non_uniform_encryption_plaintext_footer() {
@@ -187,6 +188,61 @@ fn test_aes_ctr_encryption() {
};
}
+#[test]
+fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() {
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
+ let file = File::open(path).unwrap();
+
+ let key_retriever = TestKeyRetriever::new()
+ .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+ .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+ .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+ let decryption_properties =
+ FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+ .build()
+ .unwrap();
+
+ verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_non_uniform_encryption_with_key_retriever() {
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+ let file = File::open(path).unwrap();
+
+ let key_retriever = TestKeyRetriever::new()
+ .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+ .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+ .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+ let decryption_properties =
+ FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+ .build()
+ .unwrap();
+
+ verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_uniform_encryption_with_key_retriever() {
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+ let file = File::open(path).unwrap();
+
+ let key_retriever =
+ TestKeyRetriever::new().with_key("kf".to_owned(),
"0123456789012345".as_bytes().to_vec());
+
+ let decryption_properties =
+ FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+ .build()
+ .unwrap();
+
+ verify_encryption_test_file_read(file, decryption_properties);
+}
+
fn verify_encryption_test_file_read(file: File, decryption_properties:
FileDecryptionProperties) {
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
diff --git a/parquet/tests/arrow_reader/encryption_async.rs
b/parquet/tests/arrow_reader/encryption_async.rs
index 01ed0a9fc9..eeac10f574 100644
--- a/parquet/tests/arrow_reader/encryption_async.rs
+++ b/parquet/tests/arrow_reader/encryption_async.rs
@@ -17,12 +17,13 @@
//! This module contains tests for reading encrypted Parquet files with the
async Arrow API
-use crate::encryption_util::verify_encryption_test_data;
+use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::errors::ParquetError;
+use std::sync::Arc;
use tokio::fs::File;
#[tokio::test]
@@ -114,8 +115,14 @@ async fn test_misspecified_encryption_keys() {
.await;
// Missing column key
- check_for_error("Parquet error: Unable to decrypt column 'double_field',
perhaps the column key is wrong or missing?",
- &path, footer_key, "".as_bytes(), column_2_key).await;
+ check_for_error(
+ "Parquet error: No column decryption key set for column
'double_field'",
+ &path,
+ footer_key,
+ "".as_bytes(),
+ column_2_key,
+ )
+ .await;
// Too short column key
check_for_error(
@@ -128,12 +135,24 @@ async fn test_misspecified_encryption_keys() {
.await;
// Wrong column key
- check_for_error("Parquet error: Unable to decrypt column 'double_field',
perhaps the column key is wrong or missing?",
- &path, footer_key, "1123456789012345".as_bytes(),
column_2_key).await;
+ check_for_error(
+ "Parquet error: Unable to decrypt column 'double_field', perhaps the
column key is wrong?",
+ &path,
+ footer_key,
+ "1123456789012345".as_bytes(),
+ column_2_key,
+ )
+ .await;
// Mixed up keys
- check_for_error("Parquet error: Unable to decrypt column 'float_field',
perhaps the column key is wrong or missing?",
- &path, footer_key, column_2_key, column_1_key).await;
+ check_for_error(
+ "Parquet error: Unable to decrypt column 'float_field', perhaps the
column key is wrong?",
+ &path,
+ footer_key,
+ column_2_key,
+ column_1_key,
+ )
+ .await;
}
#[tokio::test]
@@ -268,6 +287,67 @@ async fn test_read_encrypted_file_from_object_store() {
verify_encryption_test_data(record_batches, &metadata);
}
+#[tokio::test]
+async fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
+ let mut file = File::open(&path).await.unwrap();
+
+ let key_retriever = TestKeyRetriever::new()
+ .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+ .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+ .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+ let decryption_properties =
+ FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+ .build()
+ .unwrap();
+
+ verify_encryption_test_file_read_async(&mut file, decryption_properties)
+ .await
+ .unwrap();
+}
+
+#[tokio::test]
+async fn test_non_uniform_encryption_with_key_retriever() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+ let mut file = File::open(&path).await.unwrap();
+
+ let key_retriever = TestKeyRetriever::new()
+ .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+ .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+ .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+ let decryption_properties =
+ FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+ .build()
+ .unwrap();
+
+ verify_encryption_test_file_read_async(&mut file, decryption_properties)
+ .await
+ .unwrap();
+}
+
+#[tokio::test]
+async fn test_uniform_encryption_with_key_retriever() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
+ let mut file = File::open(&path).await.unwrap();
+
+ let key_retriever =
+ TestKeyRetriever::new().with_key("kf".to_owned(),
"0123456789012345".as_bytes().to_vec());
+
+ let decryption_properties =
+ FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+ .build()
+ .unwrap();
+
+ verify_encryption_test_file_read_async(&mut file, decryption_properties)
+ .await
+ .unwrap();
+}
+
async fn verify_encryption_test_file_read_async(
file: &mut tokio::fs::File,
decryption_properties: FileDecryptionProperties,
diff --git a/parquet/tests/arrow_reader/encryption_util.rs
b/parquet/tests/arrow_reader/encryption_util.rs
index de21f13ca6..f0d1152fb0 100644
--- a/parquet/tests/arrow_reader/encryption_util.rs
+++ b/parquet/tests/arrow_reader/encryption_util.rs
@@ -17,7 +17,11 @@
use arrow_array::cast::AsArray;
use arrow_array::{types, RecordBatch};
+use parquet::encryption::decrypt::KeyRetriever;
+use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::ParquetMetaData;
+use std::collections::HashMap;
+use std::sync::Mutex;
/// Verifies data read from an encrypted file from the parquet-testing
repository
pub fn verify_encryption_test_data(record_batches: Vec<RecordBatch>, metadata:
&ParquetMetaData) {
@@ -83,3 +87,41 @@ pub fn verify_encryption_test_data(record_batches:
Vec<RecordBatch>, metadata: &
assert_eq!(row_count, file_metadata.num_rows() as usize);
}
+
+/// A KeyRetriever to use in Parquet encryption tests,
+/// which stores a map from key names/metadata to encryption key bytes.
+pub struct TestKeyRetriever {
+ keys: Mutex<HashMap<String, Vec<u8>>>,
+}
+
+impl TestKeyRetriever {
+ pub fn new() -> Self {
+ Self {
+ keys: Mutex::new(HashMap::default()),
+ }
+ }
+
+ pub fn with_key(self, key_name: String, key: Vec<u8>) -> Self {
+ {
+ let mut keys = self.keys.lock().unwrap();
+ keys.insert(key_name, key);
+ }
+ self
+ }
+}
+
+impl KeyRetriever for TestKeyRetriever {
+ fn retrieve_key(&self, key_metadata: &[u8]) -> Result<Vec<u8>> {
+ let key_metadata = std::str::from_utf8(key_metadata).map_err(|e| {
+ ParquetError::General(format!("Could not convert key metadata to
string: {}", e))
+ })?;
+ let keys = self.keys.lock().unwrap();
+ match keys.get(key_metadata) {
+ Some(key) => Ok(key.clone()),
+ None => Err(ParquetError::General(format!(
+ "Could not retrieve key for metadata {:?}",
+ key_metadata
+ ))),
+ }
+ }
+}