This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new cbc0614209 feat: Dynamic Parquet encryption and decryption properties (#16779) cbc0614209 is described below commit cbc0614209a14a2ea335942de335b38e4d71443e Author: Adam Reeve <adam.re...@gr-oss.io> AuthorDate: Sat Aug 9 06:40:41 2025 +1200 feat: Dynamic Parquet encryption and decryption properties (#16779) * Add encryption factory API for more flexible encryption configuration * Remove extra DynEncryptionFactory trait * Tidy up example * Tidy ups * Fix "unnecessary qualification" errors in example * Fix toml format with taplo * Fix broken link in docs * Clippy fixes * Update examples README * Add extra validation of table encryption * clippy fix * Remove register_parquet_encryption_factory from SessionContext * Remove new dependency from example * Update examples readme * Run taplo format * Fix outdated method reference in comment * Extra comment --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- Cargo.lock | 3 + datafusion-examples/Cargo.toml | 2 + datafusion-examples/README.md | 1 + .../examples/parquet_encrypted_with_kms.rs | 301 +++++++++++++++++++++ datafusion/common/src/config.rs | 90 ++++++ datafusion/common/src/encryption.rs | 29 +- .../common/src/file_options/parquet_writer.rs | 9 +- datafusion/core/tests/memory_limit/mod.rs | 4 + datafusion/core/tests/parquet/encryption.rs | 254 ++++++++++++++++- datafusion/core/tests/parquet/mod.rs | 1 + datafusion/datasource-parquet/Cargo.toml | 1 + datafusion/datasource-parquet/src/file_format.rs | 165 +++++++++-- datafusion/datasource-parquet/src/opener.rs | 56 +++- datafusion/datasource-parquet/src/source.rs | 42 ++- datafusion/execution/Cargo.toml | 6 + datafusion/execution/src/lib.rs | 2 + datafusion/execution/src/parquet_encryption.rs | 81 ++++++ datafusion/execution/src/runtime_env.rs | 40 +++ 18 files changed, 1020 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c06d5e6cfc..ff9bcf0f71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2172,6 +2172,7 @@ dependencies = [ "arrow-flight", "arrow-schema", "async-trait", + "base64 0.22.1", "bytes", "dashmap", "datafusion", @@ -2184,6 +2185,7 @@ dependencies = [ "nix", "object_store", "prost", + "rand 0.9.2", "serde_json", "tempfile", "test-utils", @@ -2209,6 +2211,7 @@ dependencies = [ "log", "object_store", "parking_lot", + "parquet", "rand 0.9.2", "tempfile", "url", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index c46c24d0c9..409fc12bcb 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -65,6 +65,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } # note only use main datafusion crate for examples +base64 = "0.22.1" datafusion = { workspace = true, default-features = true } datafusion-ffi = { workspace = true } datafusion-proto = { workspace = true } @@ -74,6 +75,7 @@ log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } prost = { workspace = true } +rand = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } test-utils = { path = "../test-utils" } diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 02f83b9bd0..75a53bc568 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -68,6 +68,7 @@ cargo run --example dataframe - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates - [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries - [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion +- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory - [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries - [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution - [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs new file mode 100644 index 0000000000..d30608ce7a --- /dev/null +++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow_schema::SchemaRef; +use base64::Engine; +use datafusion::common::extensions_options; +use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions}; +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::error::Result; +use datafusion::execution::parquet_encryption::EncryptionFactory; +use datafusion::parquet::encryption::decrypt::KeyRetriever; +use datafusion::parquet::encryption::{ + decrypt::FileDecryptionProperties, encrypt::FileEncryptionProperties, +}; +use datafusion::prelude::SessionContext; +use futures::StreamExt; +use object_store::path::Path; +use rand::rand_core::{OsRng, TryRngCore}; +use std::collections::HashSet; +use std::sync::Arc; +use tempfile::TempDir; + +const ENCRYPTION_FACTORY_ID: &str = "example.mock_kms_encryption"; + +/// This example demonstrates reading and writing Parquet files that +/// are encrypted using Parquet Modular Encryption. +/// +/// Compared to the `parquet_encrypted` example, where AES keys +/// are specified directly, this example implements an `EncryptionFactory` that +/// generates encryption keys dynamically per file. +/// Encryption key metadata is stored inline in the Parquet files and is used to determine +/// the decryption keys when reading the files. +/// +/// In this example, encryption keys are simply stored base64 encoded in the Parquet metadata, +/// which is not a secure way to store encryption keys. +/// For production use, it is recommended to use a key-management service (KMS) to encrypt +/// data encryption keys. +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + // Register an `EncryptionFactory` implementation to be used for Parquet encryption + // in the runtime environment. + // `EncryptionFactory` instances are registered with a name to identify them so + // they can be later referenced in configuration options, and it's possible to register + // multiple different factories to handle different ways of encrypting Parquet. + let encryption_factory = TestEncryptionFactory::default(); + ctx.runtime_env().register_parquet_encryption_factory( + ENCRYPTION_FACTORY_ID, + Arc::new(encryption_factory), + ); + + // Register some simple test data + let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100])); + let c: ArrayRef = Arc::new(Int32Array::from(vec![2, 20, 20, 200])); + let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)])?; + ctx.register_batch("test_data", batch)?; + + { + // Write and read encrypted Parquet with the programmatic API + let tmpdir = TempDir::new()?; + let table_path = format!("{}/", tmpdir.path().to_str().unwrap()); + write_encrypted(&ctx, &table_path).await?; + read_encrypted(&ctx, &table_path).await?; + } + + { + // Write and read encrypted Parquet with the SQL API + let tmpdir = TempDir::new()?; + let table_path = format!("{}/", tmpdir.path().to_str().unwrap()); + write_encrypted_with_sql(&ctx, &table_path).await?; + read_encrypted_with_sql(&ctx, &table_path).await?; + } + + Ok(()) +} + +/// Write an encrypted Parquet file +async fn write_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { + let df = ctx.table("test_data").await?; + + let mut parquet_options = TableParquetOptions::new(); + // We specify that we want to use Parquet encryption by setting the identifier of the + // encryption factory to use and providing the factory-specific configuration. + // Our encryption factory only requires specifying the columns to encrypt. + let encryption_config = EncryptionConfig { + encrypted_columns: "b,c".to_owned(), + }; + parquet_options + .crypto + .configure_factory(ENCRYPTION_FACTORY_ID, &encryption_config); + + df.write_parquet( + table_path, + DataFrameWriteOptions::new(), + Some(parquet_options), + ) + .await?; + + println!("Encrypted Parquet written to {table_path}"); + Ok(()) +} + +/// Read from an encrypted Parquet file +async fn read_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { + let mut parquet_options = TableParquetOptions::new(); + // Specify the encryption factory to use for decrypting Parquet. + // In this example, we don't require any additional configuration options when reading + // as we only need the key metadata from the Parquet files to determine the decryption keys. + parquet_options + .crypto + .configure_factory(ENCRYPTION_FACTORY_ID, &EncryptionConfig::default()); + + let file_format = ParquetFormat::default().with_options(parquet_options); + let listing_options = ListingOptions::new(Arc::new(file_format)); + + ctx.register_listing_table( + "encrypted_parquet_table", + &table_path, + listing_options.clone(), + None, + None, + ) + .await?; + + let mut batch_stream = ctx + .table("encrypted_parquet_table") + .await? + .execute_stream() + .await?; + println!("Reading encrypted Parquet as a RecordBatch stream"); + while let Some(batch) = batch_stream.next().await { + let batch = batch?; + println!("Read batch with {} rows", batch.num_rows()); + } + + println!("Finished reading"); + Ok(()) +} + +/// Write an encrypted Parquet file using only SQL syntax with string configuration +async fn write_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> { + let query = format!( + "COPY test_data \ + TO '{table_path}' \ + STORED AS parquet + OPTIONS (\ + 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}', \ + 'format.crypto.factory_options.encrypted_columns' 'b,c' \ + )" + ); + let _ = ctx.sql(&query).await?.collect().await?; + + println!("Encrypted Parquet written to {table_path}"); + Ok(()) +} + +/// Read from an encrypted Parquet file using only the SQL API and string-based configuration +async fn read_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> { + let ddl = format!( + "CREATE EXTERNAL TABLE encrypted_parquet_table_2 \ + STORED AS PARQUET LOCATION '{table_path}' OPTIONS (\ + 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}' \ + )" + ); + ctx.sql(&ddl).await?; + let df = ctx.sql("SELECT * FROM encrypted_parquet_table_2").await?; + let mut batch_stream = df.execute_stream().await?; + + println!("Reading encrypted Parquet as a RecordBatch stream"); + while let Some(batch) = batch_stream.next().await { + let batch = batch?; + println!("Read batch with {} rows", batch.num_rows()); + } + println!("Finished reading"); + Ok(()) +} + +// Options used to configure our example encryption factory +extensions_options! { + struct EncryptionConfig { + /// Comma-separated list of columns to encrypt + pub encrypted_columns: String, default = "".to_owned() + } +} + +/// Mock implementation of an `EncryptionFactory` that stores encryption keys +/// base64 encoded in the Parquet encryption metadata. +/// For production use, integrating with a key-management service to encrypt +/// data encryption keys is recommended. +#[derive(Default, Debug)] +struct TestEncryptionFactory {} + +/// `EncryptionFactory` is a DataFusion trait for types that generate +/// file encryption and decryption properties. +impl EncryptionFactory for TestEncryptionFactory { + /// Generate file encryption properties to use when writing a Parquet file. + /// The `schema` is provided so that it may be used to dynamically configure + /// per-column encryption keys. + /// The file path is also available. We don't use the path in this example, + /// but other implementations may want to use this to compute an + /// AAD prefix for the file, or to allow use of external key material + /// (where key metadata is stored in a JSON file alongside Parquet files). + fn get_file_encryption_properties( + &self, + options: &EncryptionFactoryOptions, + schema: &SchemaRef, + _file_path: &Path, + ) -> Result<Option<FileEncryptionProperties>> { + let config: EncryptionConfig = options.to_extension_options()?; + + // Generate a random encryption key for this file. + let mut key = vec![0u8; 16]; + OsRng.try_fill_bytes(&mut key).unwrap(); + + // Generate the key metadata that allows retrieving the key when reading the file. + let key_metadata = wrap_key(&key); + + let mut builder = FileEncryptionProperties::builder(key.to_vec()) + .with_footer_key_metadata(key_metadata.clone()); + + let encrypted_columns: HashSet<&str> = + config.encrypted_columns.split(",").collect(); + if !encrypted_columns.is_empty() { + // Set up per-column encryption. + for field in schema.fields().iter() { + if encrypted_columns.contains(field.name().as_str()) { + // Here we re-use the same key for all encrypted columns, + // but new keys could also be generated per column. + builder = builder.with_column_key_and_metadata( + field.name().as_str(), + key.clone(), + key_metadata.clone(), + ); + } + } + } + + let encryption_properties = builder.build()?; + + Ok(Some(encryption_properties)) + } + + /// Generate file decryption properties to use when reading a Parquet file. + /// Rather than provide the AES keys directly for decryption, we set a `KeyRetriever` + /// that can determine the keys using the encryption metadata. + fn get_file_decryption_properties( + &self, + _options: &EncryptionFactoryOptions, + _file_path: &Path, + ) -> Result<Option<FileDecryptionProperties>> { + let decryption_properties = + FileDecryptionProperties::with_key_retriever(Arc::new(TestKeyRetriever {})) + .build()?; + Ok(Some(decryption_properties)) + } +} + +/// Mock implementation of encrypting a key that simply base64 encodes the key. +/// Note that this is not a secure way to store encryption keys, +/// and for production use keys should be encrypted with a KMS. +fn wrap_key(key: &[u8]) -> Vec<u8> { + base64::prelude::BASE64_STANDARD + .encode(key) + .as_bytes() + .to_vec() +} + +struct TestKeyRetriever {} + +impl KeyRetriever for TestKeyRetriever { + /// Get a data encryption key using the metadata stored in the Parquet file. + fn retrieve_key( + &self, + key_metadata: &[u8], + ) -> datafusion::parquet::errors::Result<Vec<u8>> { + let key_metadata = std::str::from_utf8(key_metadata)?; + let key = base64::prelude::BASE64_STANDARD + .decode(key_metadata) + .unwrap(); + Ok(key) + } +} diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index be2a734d37..939d13d969 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -194,6 +194,7 @@ macro_rules! config_namespace { } } } + config_namespace! { /// Options related to catalog and directory scanning /// @@ -676,6 +677,31 @@ config_namespace! { /// Optional file encryption properties pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None + + /// Identifier for the encryption factory to use to create file encryption and decryption properties. + /// Encryption factories can be registered in the runtime environment with + /// `RuntimeEnv::register_parquet_encryption_factory`. + pub factory_id: Option<String>, default = None + + /// Any encryption factory specific options + pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default() + } +} + +impl ParquetEncryptionOptions { + /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration + pub fn configure_factory( + &mut self, + factory_id: &str, + config: &impl ExtensionOptions, + ) { + self.factory_id = Some(factory_id.to_owned()); + self.factory_options.options.clear(); + for entry in config.entries() { + if let Some(value) = entry.value { + self.factory_options.options.insert(entry.key, value); + } + } } } @@ -2382,6 +2408,40 @@ impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties { } } +/// Holds implementation-specific options for an encryption factory +#[derive(Clone, Debug, Default, PartialEq)] +pub struct EncryptionFactoryOptions { + pub options: HashMap<String, String>, +} + +impl ConfigField for EncryptionFactoryOptions { + fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) { + for (option_key, option_value) in &self.options { + v.some( + &format!("{key}.{option_key}"), + option_value, + "Encryption factory specific option", + ); + } + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + self.options.insert(key.to_owned(), value.to_owned()); + Ok(()) + } +} + +impl EncryptionFactoryOptions { + /// Convert these encryption factory options to an [`ExtensionOptions`] instance. + pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> { + let mut options = T::default(); + for (key, value) in &self.options { + options.set(key, value)?; + } + Ok(options) + } +} + config_namespace! { /// Options controlling CSV format pub struct CsvOptions { @@ -2821,6 +2881,36 @@ mod tests { ); } + #[cfg(feature = "parquet_encryption")] + #[test] + fn parquet_encryption_factory_config() { + let mut parquet_options = crate::config::TableParquetOptions::default(); + + assert_eq!(parquet_options.crypto.factory_id, None); + assert_eq!(parquet_options.crypto.factory_options.options.len(), 0); + + let mut input_config = TestExtensionConfig::default(); + input_config + .properties + .insert("key1".to_string(), "value 1".to_string()); + input_config + .properties + .insert("key2".to_string(), "value 2".to_string()); + + parquet_options + .crypto + .configure_factory("example_factory", &input_config); + + assert_eq!( + parquet_options.crypto.factory_id, + Some("example_factory".to_string()) + ); + let factory_options = &parquet_options.crypto.factory_options.options; + assert_eq!(factory_options.len(), 2); + assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string())); + assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string())); + } + #[cfg(feature = "parquet")] #[test] fn parquet_table_options_config_entry() { diff --git a/datafusion/common/src/encryption.rs b/datafusion/common/src/encryption.rs index 5d50d4a9ef..5dd603a081 100644 --- a/datafusion/common/src/encryption.rs +++ b/datafusion/common/src/encryption.rs @@ -28,24 +28,7 @@ pub struct FileDecryptionProperties; #[cfg(not(feature = "parquet_encryption"))] pub struct FileEncryptionProperties; -#[cfg(feature = "parquet")] -use crate::config::ParquetEncryptionOptions; pub use crate::config::{ConfigFileDecryptionProperties, ConfigFileEncryptionProperties}; -#[cfg(feature = "parquet")] -use parquet::file::properties::WriterPropertiesBuilder; - -#[cfg(feature = "parquet")] -pub fn add_crypto_to_writer_properties( - #[allow(unused)] crypto: &ParquetEncryptionOptions, - #[allow(unused_mut)] mut builder: WriterPropertiesBuilder, -) -> WriterPropertiesBuilder { - #[cfg(feature = "parquet_encryption")] - if let Some(file_encryption_properties) = &crypto.file_encryption { - builder = builder - .with_file_encryption_properties(file_encryption_properties.clone().into()); - } - builder -} #[cfg(feature = "parquet_encryption")] pub fn map_encryption_to_config_encryption( @@ -63,14 +46,14 @@ pub fn map_encryption_to_config_encryption( #[cfg(feature = "parquet_encryption")] pub fn map_config_decryption_to_decryption( - decryption: Option<&ConfigFileDecryptionProperties>, -) -> Option<FileDecryptionProperties> { - decryption.map(|fd| fd.clone().into()) + decryption: &ConfigFileDecryptionProperties, +) -> FileDecryptionProperties { + decryption.clone().into() } #[cfg(not(feature = "parquet_encryption"))] pub fn map_config_decryption_to_decryption( - _decryption: Option<&ConfigFileDecryptionProperties>, -) -> Option<FileDecryptionProperties> { - None + _decryption: &ConfigFileDecryptionProperties, +) -> FileDecryptionProperties { + FileDecryptionProperties {} } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 91683ccb1b..185826aef4 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -27,7 +27,6 @@ use crate::{ use arrow::datatypes::Schema; // TODO: handle once deprecated -use crate::encryption::add_crypto_to_writer_properties; #[allow(deprecated)] use parquet::{ arrow::ARROW_SCHEMA_META_KEY, @@ -90,19 +89,19 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. /// /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + /// Note that any encryption options are ignored as building the `FileEncryptionProperties` + /// might require other inputs besides the [`TableParquetOptions`]. fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> { // Table options include kv_metadata and col-specific options let TableParquetOptions { global, column_specific_options, key_value_metadata, - crypto, + crypto: _, } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; - builder = add_crypto_to_writer_properties(crypto, builder); - // check that the arrow schema is present in the kv_metadata, if configured to do so if !global.skip_arrow_metadata && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY) @@ -617,6 +616,8 @@ mod tests { crypto: ParquetEncryptionOptions { file_encryption: fep, file_decryption: None, + factory_id: None, + factory_options: Default::default(), }, } } diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 3cc177feac..b4b88ba5aa 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -567,6 +567,10 @@ async fn setup_context( disk_manager: Arc::new(disk_manager), cache_manager: runtime.cache_manager.clone(), object_store_registry: runtime.object_store_registry.clone(), + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry: runtime + .parquet_encryption_factory_registry + .clone(), }); let config = SessionConfig::new() diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 8e90b9aaa9..a71a4f1ea2 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -15,27 +15,29 @@ // specific language governing permissions and limitations // under the License. -//! non trivial integration testing for parquet predicate pushdown -//! -//! Testing hints: If you run this test with --nocapture it will tell you where -//! the generated parquet file went. You can then test it and try out various queries -//! datafusion-cli like: -//! -//! ```sql -//! create external table data stored as parquet location 'data.parquet'; -//! select * from data limit 10; -//! ``` +//! Tests for reading and writing Parquet files that use Parquet modular encryption +use arrow::array::{ArrayRef, Int32Array, StringArray}; use arrow::record_batch::RecordBatch; +use arrow_schema::{DataType, SchemaRef}; +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::datasource::listing::ListingOptions; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use std::fs::File; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - +use datafusion_common::config::{EncryptionFactoryOptions, TableParquetOptions}; +use datafusion_common::{assert_batches_sorted_eq, DataFusionError}; +use datafusion_datasource_parquet::ParquetFormat; +use datafusion_execution::parquet_encryption::EncryptionFactory; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::ArrowWriter; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; +use parquet::file::column_crypto_metadata::ColumnCryptoMetaData; use parquet::file::properties::WriterProperties; +use std::collections::HashMap; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::{Arc, Mutex}; use tempfile::TempDir; async fn read_parquet_test_data<'a, T: Into<String>>( @@ -74,7 +76,6 @@ pub fn write_batches( Ok(num_rows) } -#[cfg(feature = "parquet_encryption")] #[tokio::test] async fn round_trip_encryption() { let ctx: SessionContext = SessionContext::new(); @@ -128,3 +129,226 @@ async fn round_trip_encryption() { assert_eq!(num_rows_written, num_rows_read); } + +#[tokio::test] +async fn round_trip_parquet_with_encryption_factory() { + let ctx = SessionContext::new(); + let encryption_factory = Arc::new(MockEncryptionFactory::default()); + ctx.runtime_env().register_parquet_encryption_factory( + "test_encryption_factory", + Arc::clone(&encryption_factory) as Arc<dyn EncryptionFactory>, + ); + + let tmpdir = TempDir::new().unwrap(); + + // Register some simple test data + let strings: ArrayRef = + Arc::new(StringArray::from(vec!["a", "b", "c", "a", "b", "c"])); + let x1: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 11, 100, 101, 111])); + let x2: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])); + let batch = + RecordBatch::try_from_iter(vec![("string", strings), ("x1", x1), ("x2", x2)]) + .unwrap(); + let test_data_schema = batch.schema(); + ctx.register_batch("test_data", batch).unwrap(); + let df = ctx.table("test_data").await.unwrap(); + + // Write encrypted Parquet, partitioned by string column into separate files + let mut parquet_options = TableParquetOptions::new(); + parquet_options.crypto.factory_id = Some("test_encryption_factory".to_string()); + parquet_options + .crypto + .factory_options + .options + .insert("test_key".to_string(), "test value".to_string()); + + let df_write_options = + DataFrameWriteOptions::default().with_partition_by(vec!["string".to_string()]); + df.write_parquet( + tmpdir.path().to_str().unwrap(), + df_write_options, + Some(parquet_options.clone()), + ) + .await + .unwrap(); + + // Crypto factory should have generated one key per partition file + assert_eq!(encryption_factory.encryption_keys.lock().unwrap().len(), 3); + + verify_table_encrypted(tmpdir.path(), &encryption_factory).unwrap(); + + // Registering table without decryption properties should fail + let table_path = format!("file://{}/", tmpdir.path().to_str().unwrap()); + let without_decryption_register = ctx + .register_listing_table( + "parquet_missing_decryption", + &table_path, + ListingOptions::new(Arc::new(ParquetFormat::default())), + None, + None, + ) + .await; + assert!(matches!( + without_decryption_register.unwrap_err(), + DataFusionError::ParquetError(_) + )); + + // Registering table succeeds if schema is provided + ctx.register_listing_table( + "parquet_missing_decryption", + &table_path, + ListingOptions::new(Arc::new(ParquetFormat::default())), + Some(test_data_schema), + None, + ) + .await + .unwrap(); + + // But trying to read from the table should fail + let without_decryption_read = ctx + .table("parquet_missing_decryption") + .await + .unwrap() + .collect() + .await; + assert!(matches!( + without_decryption_read.unwrap_err(), + DataFusionError::ParquetError(_) + )); + + // Register table with encryption factory specified + let listing_options = ListingOptions::new(Arc::new( + ParquetFormat::default().with_options(parquet_options), + )) + .with_table_partition_cols(vec![("string".to_string(), DataType::Utf8)]); + ctx.register_listing_table( + "parquet_with_decryption", + &table_path, + listing_options, + None, + None, + ) + .await + .unwrap(); + + // Can read correct data when encryption factory has been specified + let table = ctx + .table("parquet_with_decryption") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected = [ + "+-----+----+--------+", + "| x1 | x2 | string |", + "+-----+----+--------+", + "| 1 | 1 | a |", + "| 100 | 4 | a |", + "| 10 | 2 | b |", + "| 101 | 5 | b |", + "| 11 | 3 | c |", + "| 111 | 6 | c |", + "+-----+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &table); +} + +fn verify_table_encrypted( + table_path: &Path, + encryption_factory: &Arc<MockEncryptionFactory>, +) -> datafusion_common::Result<()> { + let mut directories = vec![table_path.to_path_buf()]; + let mut files_visited = 0; + while let Some(directory) = directories.pop() { + for entry in std::fs::read_dir(&directory)? { + let path = entry?.path(); + if path.is_dir() { + directories.push(path); + } else { + verify_file_encrypted(&path, encryption_factory)?; + files_visited += 1; + } + } + } + assert!(files_visited > 0); + Ok(()) +} + +fn verify_file_encrypted( + file_path: &Path, + encryption_factory: &Arc<MockEncryptionFactory>, +) -> datafusion_common::Result<()> { + let mut options = EncryptionFactoryOptions::default(); + options + .options + .insert("test_key".to_string(), "test value".to_string()); + let object_path = object_store::path::Path::from(file_path.to_str().unwrap()); + let decryption_properties = encryption_factory + .get_file_decryption_properties(&options, &object_path)? + .unwrap(); + + let reader_options = + ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); + let file = File::open(file_path)?; + let reader_metadata = ArrowReaderMetadata::load(&file, reader_options)?; + let metadata = reader_metadata.metadata(); + assert!(metadata.num_row_groups() > 0); + for row_group in metadata.row_groups() { + assert!(row_group.num_columns() > 0); + for col in row_group.columns() { + assert!(matches!( + col.crypto_metadata(), + Some(ColumnCryptoMetaData::EncryptionWithFooterKey) + )); + } + } + Ok(()) +} + +/// Encryption factory implementation for use in tests, +/// which generates encryption keys in a sequence +#[derive(Debug, Default)] +struct MockEncryptionFactory { + pub encryption_keys: Mutex<HashMap<object_store::path::Path, Vec<u8>>>, + pub counter: AtomicU8, +} + +impl EncryptionFactory for MockEncryptionFactory { + fn get_file_encryption_properties( + &self, + config: &EncryptionFactoryOptions, + _schema: &SchemaRef, + file_path: &object_store::path::Path, + ) -> datafusion_common::Result<Option<FileEncryptionProperties>> { + assert_eq!( + config.options.get("test_key"), + Some(&"test value".to_string()) + ); + let file_idx = self.counter.fetch_add(1, Ordering::Relaxed); + let key = vec![file_idx, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; + let mut keys = self.encryption_keys.lock().unwrap(); + keys.insert(file_path.clone(), key.clone()); + let encryption_properties = FileEncryptionProperties::builder(key).build()?; + Ok(Some(encryption_properties)) + } + + fn get_file_decryption_properties( + &self, + config: &EncryptionFactoryOptions, + file_path: &object_store::path::Path, + ) -> datafusion_common::Result<Option<FileDecryptionProperties>> { + assert_eq!( + config.options.get("test_key"), + Some(&"test value".to_string()) + ); + let keys = self.encryption_keys.lock().unwrap(); + let key = keys.get(file_path).ok_or_else(|| { + DataFusionError::Execution(format!("No key for file {file_path:?}")) + })?; + let decryption_properties = + FileDecryptionProperties::builder(key.clone()).build()?; + Ok(Some(decryption_properties)) + } +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 82444e8b6e..c44d14abd3 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -43,6 +43,7 @@ use std::sync::Arc; use tempfile::NamedTempFile; mod custom_reader; +#[cfg(feature = "parquet_encryption")] mod encryption; mod external_access_plan; mod file_statistics; diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index 8a75a445c8..6bccd76b60 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -71,5 +71,6 @@ path = "src/mod.rs" parquet_encryption = [ "parquet/encryption", "datafusion-common/parquet_encryption", + "datafusion-execution/parquet_encryption", "dep:hex", ] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 8276a3a8ff..e7c4494685 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -39,9 +39,9 @@ use datafusion_datasource::write::demux::DemuxedStreamReceiver; use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; -use datafusion_common::encryption::{ - map_config_decryption_to_decryption, FileDecryptionProperties, -}; +#[cfg(feature = "parquet_encryption")] +use datafusion_common::encryption::map_config_decryption_to_decryption; +use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -68,6 +68,7 @@ use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::source::DataSourceExec; +use datafusion_execution::runtime_env::RuntimeEnv; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::debug; @@ -305,24 +306,60 @@ fn clear_metadata( } async fn fetch_schema_with_location( + state: &dyn Session, store: &dyn ObjectStore, + options: &TableParquetOptions, file: &ObjectMeta, metadata_size_hint: Option<usize>, - file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option<TimeUnit>, ) -> Result<(Path, Schema)> { + let file_decryption_properties = + get_file_decryption_properties(state, options, &file.location)?; let loc_path = file.location.clone(); let schema = fetch_schema( store, file, metadata_size_hint, - file_decryption_properties, + file_decryption_properties.as_ref(), coerce_int96, ) .await?; Ok((loc_path, schema)) } +#[cfg(feature = "parquet_encryption")] +fn get_file_decryption_properties( + state: &dyn Session, + options: &TableParquetOptions, + file_path: &Path, +) -> Result<Option<FileDecryptionProperties>> { + let file_decryption_properties: Option<FileDecryptionProperties> = + match &options.crypto.file_decryption { + Some(cfd) => Some(map_config_decryption_to_decryption(cfd)), + None => match &options.crypto.factory_id { + Some(factory_id) => { + let factory = + state.runtime_env().parquet_encryption_factory(factory_id)?; + factory.get_file_decryption_properties( + &options.crypto.factory_options, + file_path, + )? + } + None => None, + }, + }; + Ok(file_decryption_properties) +} + +#[cfg(not(feature = "parquet_encryption"))] +fn get_file_decryption_properties( + _state: &dyn Session, + _options: &TableParquetOptions, + _file_path: &Path, +) -> Result<Option<FileDecryptionProperties>> { + Ok(None) +} + #[async_trait] impl FileFormat for ParquetFormat { fn as_any(&self) -> &dyn Any { @@ -358,18 +395,15 @@ impl FileFormat for ParquetFormat { Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), None => None, }; - let file_decryption_properties: Option<FileDecryptionProperties> = - map_config_decryption_to_decryption( - self.options.crypto.file_decryption.as_ref(), - ); let mut schemas: Vec<_> = futures::stream::iter(objects) .map(|object| { fetch_schema_with_location( + state, store.as_ref(), + &self.options, object, self.metadata_size_hint(), - file_decryption_properties.as_ref(), coerce_int96, ) }) @@ -414,15 +448,13 @@ impl FileFormat for ParquetFormat { async fn infer_stats( &self, - _state: &dyn Session, + state: &dyn Session, store: &Arc<dyn ObjectStore>, table_schema: SchemaRef, object: &ObjectMeta, ) -> Result<Statistics> { - let file_decryption_properties: Option<FileDecryptionProperties> = - map_config_decryption_to_decryption( - self.options.crypto.file_decryption.as_ref(), - ); + let file_decryption_properties = + get_file_decryption_properties(state, &self.options, &object.location)?; let stats = fetch_statistics( store.as_ref(), table_schema, @@ -459,6 +491,9 @@ impl FileFormat for ParquetFormat { if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) } + + source = self.set_source_encryption_factory(source, state)?; + // Apply schema adapter factory before building the new config let file_source = source.apply_schema_adapter(&conf)?; @@ -489,6 +524,41 @@ impl FileFormat for ParquetFormat { } } +#[cfg(feature = "parquet_encryption")] +impl ParquetFormat { + fn set_source_encryption_factory( + &self, + source: ParquetSource, + state: &dyn Session, + ) -> Result<ParquetSource> { + if let Some(encryption_factory_id) = &self.options.crypto.factory_id { + Ok(source.with_encryption_factory( + state + .runtime_env() + .parquet_encryption_factory(encryption_factory_id)?, + )) + } else { + Ok(source) + } + } +} + +#[cfg(not(feature = "parquet_encryption"))] +impl ParquetFormat { + fn set_source_encryption_factory( + &self, + source: ParquetSource, + _state: &dyn Session, + ) -> Result<ParquetSource> { + if let Some(encryption_factory_id) = &self.options.crypto.factory_id { + Err(DataFusionError::Configuration( + format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"))) + } else { + Ok(source) + } + } +} + /// Apply necessary schema type coercions to make file schema match table schema. /// /// This function performs two main types of transformations in a single pass: @@ -1243,7 +1313,11 @@ impl ParquetSink { /// Create writer properties based upon configuration settings, /// including partitioning and the inclusion of arrow schema metadata. - fn create_writer_props(&self) -> Result<WriterProperties> { + fn create_writer_props( + &self, + runtime: &Arc<RuntimeEnv>, + path: &Path, + ) -> Result<WriterProperties> { let schema = if self.parquet_options.global.allow_single_file_parallelism { // If parallelizing writes, we may be also be doing hive style partitioning // into multiple files which impacts the schema per file. @@ -1260,7 +1334,15 @@ impl ParquetSink { parquet_opts.arrow_schema(schema); } - Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build()) + let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?; + builder = set_writer_encryption_properties( + builder, + runtime, + &parquet_opts, + schema, + path, + )?; + Ok(builder.build()) } /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore @@ -1299,6 +1381,48 @@ impl ParquetSink { } } +#[cfg(feature = "parquet_encryption")] +fn set_writer_encryption_properties( + builder: WriterPropertiesBuilder, + runtime: &Arc<RuntimeEnv>, + parquet_opts: &TableParquetOptions, + schema: &Arc<Schema>, + path: &Path, +) -> Result<WriterPropertiesBuilder> { + if let Some(file_encryption_properties) = &parquet_opts.crypto.file_encryption { + // Encryption properties have been specified directly + return Ok(builder + .with_file_encryption_properties(file_encryption_properties.clone().into())); + } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() { + // Encryption properties will be generated by an encryption factory + let encryption_factory = + runtime.parquet_encryption_factory(encryption_factory_id)?; + let file_encryption_properties = encryption_factory + .get_file_encryption_properties( + &parquet_opts.crypto.factory_options, + schema, + path, + )?; + if let Some(file_encryption_properties) = file_encryption_properties { + return Ok( + builder.with_file_encryption_properties(file_encryption_properties) + ); + } + } + Ok(builder) +} + +#[cfg(not(feature = "parquet_encryption"))] +fn set_writer_encryption_properties( + builder: WriterPropertiesBuilder, + _runtime: &Arc<RuntimeEnv>, + _parquet_opts: &TableParquetOptions, + _schema: &Arc<Schema>, + _path: &Path, +) -> Result<WriterPropertiesBuilder> { + Ok(builder) +} + #[async_trait] impl FileSink for ParquetSink { fn config(&self) -> &FileSinkConfig { @@ -1316,7 +1440,9 @@ impl FileSink for ParquetSink { let mut allow_single_file_parallelism = parquet_opts.global.allow_single_file_parallelism; - if parquet_opts.crypto.file_encryption.is_some() { + if parquet_opts.crypto.file_encryption.is_some() + || parquet_opts.crypto.factory_id.is_some() + { // For now, arrow-rs does not support parallel writes with encryption // See https://github.com/apache/arrow-rs/issues/7359 allow_single_file_parallelism = false; @@ -1326,7 +1452,7 @@ impl FileSink for ParquetSink { std::result::Result<(Path, FileMetaData), DataFusionError>, > = JoinSet::new(); - let parquet_props = self.create_writer_props()?; + let runtime = context.runtime_env(); let parallel_options = ParallelParquetWriterOptions { max_parallel_row_groups: parquet_opts .global @@ -1337,6 +1463,7 @@ impl FileSink for ParquetSink { }; while let Some((path, mut rx)) = file_stream_rx.recv().await { + let parquet_props = self.create_writer_props(&runtime, &path)?; if !allow_single_file_parallelism { let mut writer = self .create_async_arrow_writer( diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 7c208d1426..62dc0fccc2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -43,6 +43,10 @@ use datafusion_physical_expr_common::physical_expr::{ use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; +#[cfg(feature = "parquet_encryption")] +use datafusion_common::config::EncryptionFactoryOptions; +#[cfg(feature = "parquet_encryption")] +use datafusion_execution::parquet_encryption::EncryptionFactory; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; @@ -96,13 +100,18 @@ pub(super) struct ParquetOpener { pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>, /// Rewrite expressions in the context of the file schema pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>, + /// Optional factory to create file decryption properties dynamically + #[cfg(feature = "parquet_encryption")] + pub encryption_factory: + Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>, } impl FileOpener for ParquetOpener { fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture> { let file_range = file_meta.range.clone(); let extensions = file_meta.extensions.clone(); - let file_name = file_meta.location().to_string(); + let file_location = file_meta.location(); + let file_name = file_location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); @@ -141,7 +150,8 @@ impl FileOpener for ParquetOpener { let mut predicate_file_schema = Arc::clone(&self.logical_file_schema); let mut enable_page_index = self.enable_page_index; - let file_decryption_properties = self.file_decryption_properties.clone(); + let file_decryption_properties = + self.get_file_decryption_properties(file_location)?; // For now, page index does not work with encrypted files. See: // https://github.com/apache/arrow-rs/issues/7629 @@ -411,6 +421,36 @@ impl FileOpener for ParquetOpener { } } +#[cfg(feature = "parquet_encryption")] +impl ParquetOpener { + fn get_file_decryption_properties( + &self, + file_location: &object_store::path::Path, + ) -> Result<Option<Arc<FileDecryptionProperties>>> { + match &self.file_decryption_properties { + Some(file_decryption_properties) => { + Ok(Some(Arc::clone(file_decryption_properties))) + } + None => match &self.encryption_factory { + Some((encryption_factory, encryption_config)) => Ok(encryption_factory + .get_file_decryption_properties(encryption_config, file_location)? + .map(Arc::new)), + None => Ok(None), + }, + } + } +} + +#[cfg(not(feature = "parquet_encryption"))] +impl ParquetOpener { + fn get_file_decryption_properties( + &self, + _file_location: &object_store::path::Path, + ) -> Result<Option<Arc<FileDecryptionProperties>>> { + Ok(self.file_decryption_properties.clone()) + } +} + /// Return the initial [`ParquetAccessPlan`] /// /// If the user has supplied one as an extension, use that @@ -672,6 +712,8 @@ mod test { coerce_int96: None, file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; @@ -758,6 +800,8 @@ mod test { coerce_int96: None, file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; @@ -860,6 +904,8 @@ mod test { coerce_int96: None, file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; let make_meta = || FileMeta { @@ -972,6 +1018,8 @@ mod test { coerce_int96: None, file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; @@ -1085,6 +1133,8 @@ mod test { coerce_int96: None, file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; @@ -1265,6 +1315,8 @@ mod test { coerce_int96: None, file_decryption_properties: None, expr_adapter_factory: None, + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, }; let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 430cb5ce54..366d42700f 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -27,6 +27,8 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; use datafusion_common::config::ConfigOptions; +#[cfg(feature = "parquet_encryption")] +use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::schema_adapter::{ @@ -51,6 +53,8 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; use datafusion_common::encryption::map_config_decryption_to_decryption; +#[cfg(feature = "parquet_encryption")] +use datafusion_execution::parquet_encryption::EncryptionFactory; use itertools::Itertools; use object_store::ObjectStore; @@ -282,6 +286,8 @@ pub struct ParquetSource { /// Optional hint for the size of the parquet metadata pub(crate) metadata_size_hint: Option<usize>, pub(crate) projected_statistics: Option<Statistics>, + #[cfg(feature = "parquet_encryption")] + pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>, } impl ParquetSource { @@ -320,6 +326,16 @@ impl ParquetSource { conf } + /// Set the encryption factory to use to generate file decryption properties + #[cfg(feature = "parquet_encryption")] + pub fn with_encryption_factory( + mut self, + encryption_factory: Arc<dyn EncryptionFactory>, + ) -> Self { + self.encryption_factory = Some(encryption_factory); + self + } + /// Options passed to the parquet reader for this scan pub fn table_parquet_options(&self) -> &TableParquetOptions { &self.table_parquet_options @@ -431,6 +447,19 @@ impl ParquetSource { Ok(file_source) } } + + #[cfg(feature = "parquet_encryption")] + fn get_encryption_factory_with_config( + &self, + ) -> Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)> { + match &self.encryption_factory { + None => None, + Some(factory) => Some(( + Arc::clone(factory), + self.table_parquet_options.crypto.factory_options.clone(), + )), + } + } } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit @@ -512,10 +541,13 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); - let file_decryption_properties = map_config_decryption_to_decryption( - self.table_parquet_options().crypto.file_decryption.as_ref(), - ) - .map(Arc::new); + let file_decryption_properties = self + .table_parquet_options() + .crypto + .file_decryption + .as_ref() + .map(map_config_decryption_to_decryption) + .map(Arc::new); let coerce_int96 = self .table_parquet_options @@ -546,6 +578,8 @@ impl FileSource for ParquetSource { coerce_int96, file_decryption_properties, expr_adapter_factory, + #[cfg(feature = "parquet_encryption")] + encryption_factory: self.get_encryption_factory_with_config(), }) } diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 5988d3a336..f6d02615e3 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -37,6 +37,11 @@ workspace = true [lib] name = "datafusion_execution" +[features] +parquet_encryption = [ + "parquet/encryption", +] + [dependencies] arrow = { workspace = true } dashmap = { workspace = true } @@ -46,6 +51,7 @@ futures = { workspace = true } log = { workspace = true } object_store = { workspace = true, features = ["fs"] } parking_lot = { workspace = true } +parquet = { workspace = true, optional = true } rand = { workspace = true } tempfile = { workspace = true } url = { workspace = true } diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 6a0a4b6322..e971e838a6 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -31,6 +31,8 @@ pub mod config; pub mod disk_manager; pub mod memory_pool; pub mod object_store; +#[cfg(feature = "parquet_encryption")] +pub mod parquet_encryption; pub mod runtime_env; mod stream; mod task; diff --git a/datafusion/execution/src/parquet_encryption.rs b/datafusion/execution/src/parquet_encryption.rs new file mode 100644 index 0000000000..13a18390d0 --- /dev/null +++ b/datafusion/execution/src/parquet_encryption.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::SchemaRef; +use dashmap::DashMap; +use datafusion_common::config::EncryptionFactoryOptions; +use datafusion_common::error::Result; +use datafusion_common::DataFusionError; +use object_store::path::Path; +use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; +use std::sync::Arc; + +/// Trait for types that generate file encryption and decryption properties to +/// write and read encrypted Parquet files. +/// This allows flexibility in how encryption keys are managed, for example, to +/// integrate with a user's key management service (KMS). +/// For example usage, see the [`parquet_encrypted_with_kms` example]. +/// +/// [`parquet_encrypted_with_kms` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_encrypted_with_kms.rs +pub trait EncryptionFactory: Send + Sync + std::fmt::Debug + 'static { + /// Generate file encryption properties to use when writing a Parquet file. + fn get_file_encryption_properties( + &self, + config: &EncryptionFactoryOptions, + schema: &SchemaRef, + file_path: &Path, + ) -> Result<Option<FileEncryptionProperties>>; + + /// Generate file decryption properties to use when reading a Parquet file. + fn get_file_decryption_properties( + &self, + config: &EncryptionFactoryOptions, + file_path: &Path, + ) -> Result<Option<FileDecryptionProperties>>; +} + +/// Stores [`EncryptionFactory`] implementations that can be retrieved by a unique string identifier +#[derive(Clone, Debug, Default)] +pub struct EncryptionFactoryRegistry { + factories: DashMap<String, Arc<dyn EncryptionFactory>>, +} + +impl EncryptionFactoryRegistry { + /// Register an [`EncryptionFactory`] with an associated identifier that can be later + /// used to configure encryption when reading or writing Parquet. + /// If an encryption factory with the same identifier was already registered, it is replaced and returned. + pub fn register_factory( + &self, + id: &str, + factory: Arc<dyn EncryptionFactory>, + ) -> Option<Arc<dyn EncryptionFactory>> { + self.factories.insert(id.to_owned(), factory) + } + + /// Retrieve an [`EncryptionFactory`] by its identifier + pub fn get_factory(&self, id: &str) -> Result<Arc<dyn EncryptionFactory>> { + self.factories + .get(id) + .map(|f| Arc::clone(f.value())) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No Parquet encryption factory found for id '{id}'" + )) + }) + } +} diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 9115665dce..9c5de42bcd 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -29,6 +29,8 @@ use crate::{ }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; +#[cfg(feature = "parquet_encryption")] +use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry}; use datafusion_common::{config::ConfigEntry, Result}; use object_store::ObjectStore; use std::path::PathBuf; @@ -78,6 +80,9 @@ pub struct RuntimeEnv { pub cache_manager: Arc<CacheManager>, /// Object Store Registry pub object_store_registry: Arc<dyn ObjectStoreRegistry>, + /// Parquet encryption factory registry + #[cfg(feature = "parquet_encryption")] + pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>, } impl Debug for RuntimeEnv { @@ -142,6 +147,28 @@ impl RuntimeEnv { pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> { self.object_store_registry.get_store(url.as_ref()) } + + /// Register an [`EncryptionFactory`] with an associated identifier that can be later + /// used to configure encryption when reading or writing Parquet. + /// If an encryption factory with the same identifier was already registered, it is replaced and returned. + #[cfg(feature = "parquet_encryption")] + pub fn register_parquet_encryption_factory( + &self, + id: &str, + encryption_factory: Arc<dyn EncryptionFactory>, + ) -> Option<Arc<dyn EncryptionFactory>> { + self.parquet_encryption_factory_registry + .register_factory(id, encryption_factory) + } + + /// Retrieve an [`EncryptionFactory`] by its identifier + #[cfg(feature = "parquet_encryption")] + pub fn parquet_encryption_factory( + &self, + id: &str, + ) -> Result<Arc<dyn EncryptionFactory>> { + self.parquet_encryption_factory_registry.get_factory(id) + } } impl Default for RuntimeEnv { @@ -168,6 +195,9 @@ pub struct RuntimeEnvBuilder { pub cache_manager: CacheManagerConfig, /// ObjectStoreRegistry to get object store based on url pub object_store_registry: Arc<dyn ObjectStoreRegistry>, + /// Parquet encryption factory registry + #[cfg(feature = "parquet_encryption")] + pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>, } impl Default for RuntimeEnvBuilder { @@ -185,6 +215,8 @@ impl RuntimeEnvBuilder { memory_pool: Default::default(), cache_manager: Default::default(), object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()), + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry: Default::default(), } } @@ -265,6 +297,8 @@ impl RuntimeEnvBuilder { memory_pool, cache_manager, object_store_registry, + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry, } = self; let memory_pool = memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default())); @@ -279,6 +313,8 @@ impl RuntimeEnvBuilder { }, cache_manager: CacheManager::try_new(&cache_manager)?, object_store_registry, + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry, }) } @@ -309,6 +345,10 @@ impl RuntimeEnvBuilder { memory_pool: Some(Arc::clone(&runtime_env.memory_pool)), cache_manager: cache_config, object_store_registry: Arc::clone(&runtime_env.object_store_registry), + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry: Arc::clone( + &runtime_env.parquet_encryption_factory_registry, + ), } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org