This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cbc0614209 feat: Dynamic Parquet encryption and decryption properties 
(#16779)
cbc0614209 is described below

commit cbc0614209a14a2ea335942de335b38e4d71443e
Author: Adam Reeve <adam.re...@gr-oss.io>
AuthorDate: Sat Aug 9 06:40:41 2025 +1200

    feat: Dynamic Parquet encryption and decryption properties (#16779)
    
    * Add encryption factory API for more flexible encryption configuration
    
    * Remove extra DynEncryptionFactory trait
    
    * Tidy up example
    
    * Tidy ups
    
    * Fix "unnecessary qualification" errors in example
    
    * Fix toml format with taplo
    
    * Fix broken link in docs
    
    * Clippy fixes
    
    * Update examples README
    
    * Add extra validation of table encryption
    
    * clippy fix
    
    * Remove register_parquet_encryption_factory from SessionContext
    
    * Remove new dependency from example
    
    * Update examples readme
    
    * Run taplo format
    
    * Fix outdated method reference in comment
    
    * Extra comment
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 Cargo.lock                                         |   3 +
 datafusion-examples/Cargo.toml                     |   2 +
 datafusion-examples/README.md                      |   1 +
 .../examples/parquet_encrypted_with_kms.rs         | 301 +++++++++++++++++++++
 datafusion/common/src/config.rs                    |  90 ++++++
 datafusion/common/src/encryption.rs                |  29 +-
 .../common/src/file_options/parquet_writer.rs      |   9 +-
 datafusion/core/tests/memory_limit/mod.rs          |   4 +
 datafusion/core/tests/parquet/encryption.rs        | 254 ++++++++++++++++-
 datafusion/core/tests/parquet/mod.rs               |   1 +
 datafusion/datasource-parquet/Cargo.toml           |   1 +
 datafusion/datasource-parquet/src/file_format.rs   | 165 +++++++++--
 datafusion/datasource-parquet/src/opener.rs        |  56 +++-
 datafusion/datasource-parquet/src/source.rs        |  42 ++-
 datafusion/execution/Cargo.toml                    |   6 +
 datafusion/execution/src/lib.rs                    |   2 +
 datafusion/execution/src/parquet_encryption.rs     |  81 ++++++
 datafusion/execution/src/runtime_env.rs            |  40 +++
 18 files changed, 1020 insertions(+), 67 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c06d5e6cfc..ff9bcf0f71 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2172,6 +2172,7 @@ dependencies = [
  "arrow-flight",
  "arrow-schema",
  "async-trait",
+ "base64 0.22.1",
  "bytes",
  "dashmap",
  "datafusion",
@@ -2184,6 +2185,7 @@ dependencies = [
  "nix",
  "object_store",
  "prost",
+ "rand 0.9.2",
  "serde_json",
  "tempfile",
  "test-utils",
@@ -2209,6 +2211,7 @@ dependencies = [
  "log",
  "object_store",
  "parking_lot",
+ "parquet",
  "rand 0.9.2",
  "tempfile",
  "url",
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index c46c24d0c9..409fc12bcb 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -65,6 +65,7 @@ async-trait = { workspace = true }
 bytes = { workspace = true }
 dashmap = { workspace = true }
 # note only use main datafusion crate for examples
+base64 = "0.22.1"
 datafusion = { workspace = true, default-features = true }
 datafusion-ffi = { workspace = true }
 datafusion-proto = { workspace = true }
@@ -74,6 +75,7 @@ log = { workspace = true }
 mimalloc = { version = "0.1", default-features = false }
 object_store = { workspace = true, features = ["aws", "http"] }
 prost = { workspace = true }
+rand = { workspace = true }
 serde_json = { workspace = true }
 tempfile = { workspace = true }
 test-utils = { path = "../test-utils" }
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 02f83b9bd0..75a53bc568 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -68,6 +68,7 @@ cargo run --example dataframe
 - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom 
OptimizerRule to replace certain predicates
 - [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a 
custom index inside a Parquet file and use it to speed up queries
 - [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write 
encrypted Parquet files using DataFusion
+- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): 
Read and write encrypted Parquet files using an encryption factory
 - [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index 
over several parquet files and use it to speed up queries
 - [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract 
statistics by visiting an ExecutionPlan after execution
 - [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into 
DataFusion `Expr`.
diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs 
b/datafusion-examples/examples/parquet_encrypted_with_kms.rs
new file mode 100644
index 0000000000..d30608ce7a
--- /dev/null
+++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use base64::Engine;
+use datafusion::common::extensions_options;
+use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions};
+use datafusion::dataframe::DataFrameWriteOptions;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::ListingOptions;
+use datafusion::error::Result;
+use datafusion::execution::parquet_encryption::EncryptionFactory;
+use datafusion::parquet::encryption::decrypt::KeyRetriever;
+use datafusion::parquet::encryption::{
+    decrypt::FileDecryptionProperties, encrypt::FileEncryptionProperties,
+};
+use datafusion::prelude::SessionContext;
+use futures::StreamExt;
+use object_store::path::Path;
+use rand::rand_core::{OsRng, TryRngCore};
+use std::collections::HashSet;
+use std::sync::Arc;
+use tempfile::TempDir;
+
+const ENCRYPTION_FACTORY_ID: &str = "example.mock_kms_encryption";
+
+/// This example demonstrates reading and writing Parquet files that
+/// are encrypted using Parquet Modular Encryption.
+///
+/// Compared to the `parquet_encrypted` example, where AES keys
+/// are specified directly, this example implements an `EncryptionFactory` that
+/// generates encryption keys dynamically per file.
+/// Encryption key metadata is stored inline in the Parquet files and is used 
to determine
+/// the decryption keys when reading the files.
+///
+/// In this example, encryption keys are simply stored base64 encoded in the 
Parquet metadata,
+/// which is not a secure way to store encryption keys.
+/// For production use, it is recommended to use a key-management service 
(KMS) to encrypt
+/// data encryption keys.
+#[tokio::main]
+async fn main() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    // Register an `EncryptionFactory` implementation to be used for Parquet 
encryption
+    // in the runtime environment.
+    // `EncryptionFactory` instances are registered with a name to identify 
them so
+    // they can be later referenced in configuration options, and it's 
possible to register
+    // multiple different factories to handle different ways of encrypting 
Parquet.
+    let encryption_factory = TestEncryptionFactory::default();
+    ctx.runtime_env().register_parquet_encryption_factory(
+        ENCRYPTION_FACTORY_ID,
+        Arc::new(encryption_factory),
+    );
+
+    // Register some simple test data
+    let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
+    let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100]));
+    let c: ArrayRef = Arc::new(Int32Array::from(vec![2, 20, 20, 200]));
+    let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", 
c)])?;
+    ctx.register_batch("test_data", batch)?;
+
+    {
+        // Write and read encrypted Parquet with the programmatic API
+        let tmpdir = TempDir::new()?;
+        let table_path = format!("{}/", tmpdir.path().to_str().unwrap());
+        write_encrypted(&ctx, &table_path).await?;
+        read_encrypted(&ctx, &table_path).await?;
+    }
+
+    {
+        // Write and read encrypted Parquet with the SQL API
+        let tmpdir = TempDir::new()?;
+        let table_path = format!("{}/", tmpdir.path().to_str().unwrap());
+        write_encrypted_with_sql(&ctx, &table_path).await?;
+        read_encrypted_with_sql(&ctx, &table_path).await?;
+    }
+
+    Ok(())
+}
+
+/// Write an encrypted Parquet file
+async fn write_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> 
{
+    let df = ctx.table("test_data").await?;
+
+    let mut parquet_options = TableParquetOptions::new();
+    // We specify that we want to use Parquet encryption by setting the 
identifier of the
+    // encryption factory to use and providing the factory-specific 
configuration.
+    // Our encryption factory only requires specifying the columns to encrypt.
+    let encryption_config = EncryptionConfig {
+        encrypted_columns: "b,c".to_owned(),
+    };
+    parquet_options
+        .crypto
+        .configure_factory(ENCRYPTION_FACTORY_ID, &encryption_config);
+
+    df.write_parquet(
+        table_path,
+        DataFrameWriteOptions::new(),
+        Some(parquet_options),
+    )
+    .await?;
+
+    println!("Encrypted Parquet written to {table_path}");
+    Ok(())
+}
+
+/// Read from an encrypted Parquet file
+async fn read_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> {
+    let mut parquet_options = TableParquetOptions::new();
+    // Specify the encryption factory to use for decrypting Parquet.
+    // In this example, we don't require any additional configuration options 
when reading
+    // as we only need the key metadata from the Parquet files to determine 
the decryption keys.
+    parquet_options
+        .crypto
+        .configure_factory(ENCRYPTION_FACTORY_ID, 
&EncryptionConfig::default());
+
+    let file_format = ParquetFormat::default().with_options(parquet_options);
+    let listing_options = ListingOptions::new(Arc::new(file_format));
+
+    ctx.register_listing_table(
+        "encrypted_parquet_table",
+        &table_path,
+        listing_options.clone(),
+        None,
+        None,
+    )
+    .await?;
+
+    let mut batch_stream = ctx
+        .table("encrypted_parquet_table")
+        .await?
+        .execute_stream()
+        .await?;
+    println!("Reading encrypted Parquet as a RecordBatch stream");
+    while let Some(batch) = batch_stream.next().await {
+        let batch = batch?;
+        println!("Read batch with {} rows", batch.num_rows());
+    }
+
+    println!("Finished reading");
+    Ok(())
+}
+
+/// Write an encrypted Parquet file using only SQL syntax with string 
configuration
+async fn write_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> 
Result<()> {
+    let query = format!(
+        "COPY test_data \
+        TO '{table_path}' \
+        STORED AS parquet
+        OPTIONS (\
+            'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}', \
+            'format.crypto.factory_options.encrypted_columns' 'b,c' \
+        )"
+    );
+    let _ = ctx.sql(&query).await?.collect().await?;
+
+    println!("Encrypted Parquet written to {table_path}");
+    Ok(())
+}
+
+/// Read from an encrypted Parquet file using only the SQL API and 
string-based configuration
+async fn read_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> 
Result<()> {
+    let ddl = format!(
+        "CREATE EXTERNAL TABLE encrypted_parquet_table_2 \
+        STORED AS PARQUET LOCATION '{table_path}' OPTIONS (\
+        'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}' \
+        )"
+    );
+    ctx.sql(&ddl).await?;
+    let df = ctx.sql("SELECT * FROM encrypted_parquet_table_2").await?;
+    let mut batch_stream = df.execute_stream().await?;
+
+    println!("Reading encrypted Parquet as a RecordBatch stream");
+    while let Some(batch) = batch_stream.next().await {
+        let batch = batch?;
+        println!("Read batch with {} rows", batch.num_rows());
+    }
+    println!("Finished reading");
+    Ok(())
+}
+
+// Options used to configure our example encryption factory
+extensions_options! {
+    struct EncryptionConfig {
+        /// Comma-separated list of columns to encrypt
+        pub encrypted_columns: String, default = "".to_owned()
+    }
+}
+
+/// Mock implementation of an `EncryptionFactory` that stores encryption keys
+/// base64 encoded in the Parquet encryption metadata.
+/// For production use, integrating with a key-management service to encrypt
+/// data encryption keys is recommended.
+#[derive(Default, Debug)]
+struct TestEncryptionFactory {}
+
+/// `EncryptionFactory` is a DataFusion trait for types that generate
+/// file encryption and decryption properties.
+impl EncryptionFactory for TestEncryptionFactory {
+    /// Generate file encryption properties to use when writing a Parquet file.
+    /// The `schema` is provided so that it may be used to dynamically 
configure
+    /// per-column encryption keys.
+    /// The file path is also available. We don't use the path in this example,
+    /// but other implementations may want to use this to compute an
+    /// AAD prefix for the file, or to allow use of external key material
+    /// (where key metadata is stored in a JSON file alongside Parquet files).
+    fn get_file_encryption_properties(
+        &self,
+        options: &EncryptionFactoryOptions,
+        schema: &SchemaRef,
+        _file_path: &Path,
+    ) -> Result<Option<FileEncryptionProperties>> {
+        let config: EncryptionConfig = options.to_extension_options()?;
+
+        // Generate a random encryption key for this file.
+        let mut key = vec![0u8; 16];
+        OsRng.try_fill_bytes(&mut key).unwrap();
+
+        // Generate the key metadata that allows retrieving the key when 
reading the file.
+        let key_metadata = wrap_key(&key);
+
+        let mut builder = FileEncryptionProperties::builder(key.to_vec())
+            .with_footer_key_metadata(key_metadata.clone());
+
+        let encrypted_columns: HashSet<&str> =
+            config.encrypted_columns.split(",").collect();
+        if !encrypted_columns.is_empty() {
+            // Set up per-column encryption.
+            for field in schema.fields().iter() {
+                if encrypted_columns.contains(field.name().as_str()) {
+                    // Here we re-use the same key for all encrypted columns,
+                    // but new keys could also be generated per column.
+                    builder = builder.with_column_key_and_metadata(
+                        field.name().as_str(),
+                        key.clone(),
+                        key_metadata.clone(),
+                    );
+                }
+            }
+        }
+
+        let encryption_properties = builder.build()?;
+
+        Ok(Some(encryption_properties))
+    }
+
+    /// Generate file decryption properties to use when reading a Parquet file.
+    /// Rather than provide the AES keys directly for decryption, we set a 
`KeyRetriever`
+    /// that can determine the keys using the encryption metadata.
+    fn get_file_decryption_properties(
+        &self,
+        _options: &EncryptionFactoryOptions,
+        _file_path: &Path,
+    ) -> Result<Option<FileDecryptionProperties>> {
+        let decryption_properties =
+            
FileDecryptionProperties::with_key_retriever(Arc::new(TestKeyRetriever {}))
+                .build()?;
+        Ok(Some(decryption_properties))
+    }
+}
+
+/// Mock implementation of encrypting a key that simply base64 encodes the key.
+/// Note that this is not a secure way to store encryption keys,
+/// and for production use keys should be encrypted with a KMS.
+fn wrap_key(key: &[u8]) -> Vec<u8> {
+    base64::prelude::BASE64_STANDARD
+        .encode(key)
+        .as_bytes()
+        .to_vec()
+}
+
+struct TestKeyRetriever {}
+
+impl KeyRetriever for TestKeyRetriever {
+    /// Get a data encryption key using the metadata stored in the Parquet 
file.
+    fn retrieve_key(
+        &self,
+        key_metadata: &[u8],
+    ) -> datafusion::parquet::errors::Result<Vec<u8>> {
+        let key_metadata = std::str::from_utf8(key_metadata)?;
+        let key = base64::prelude::BASE64_STANDARD
+            .decode(key_metadata)
+            .unwrap();
+        Ok(key)
+    }
+}
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index be2a734d37..939d13d969 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -194,6 +194,7 @@ macro_rules! config_namespace {
         }
     }
 }
+
 config_namespace! {
     /// Options related to catalog and directory scanning
     ///
@@ -676,6 +677,31 @@ config_namespace! {
 
         /// Optional file encryption properties
         pub file_encryption: Option<ConfigFileEncryptionProperties>, default = 
None
+
+        /// Identifier for the encryption factory to use to create file 
encryption and decryption properties.
+        /// Encryption factories can be registered in the runtime environment 
with
+        /// `RuntimeEnv::register_parquet_encryption_factory`.
+        pub factory_id: Option<String>, default = None
+
+        /// Any encryption factory specific options
+        pub factory_options: EncryptionFactoryOptions, default = 
EncryptionFactoryOptions::default()
+    }
+}
+
+impl ParquetEncryptionOptions {
+    /// Specify the encryption factory to use for Parquet modular encryption, 
along with its configuration
+    pub fn configure_factory(
+        &mut self,
+        factory_id: &str,
+        config: &impl ExtensionOptions,
+    ) {
+        self.factory_id = Some(factory_id.to_owned());
+        self.factory_options.options.clear();
+        for entry in config.entries() {
+            if let Some(value) = entry.value {
+                self.factory_options.options.insert(entry.key, value);
+            }
+        }
     }
 }
 
@@ -2382,6 +2408,40 @@ impl From<&FileDecryptionProperties> for 
ConfigFileDecryptionProperties {
     }
 }
 
+/// Holds implementation-specific options for an encryption factory
+#[derive(Clone, Debug, Default, PartialEq)]
+pub struct EncryptionFactoryOptions {
+    pub options: HashMap<String, String>,
+}
+
+impl ConfigField for EncryptionFactoryOptions {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static 
str) {
+        for (option_key, option_value) in &self.options {
+            v.some(
+                &format!("{key}.{option_key}"),
+                option_value,
+                "Encryption factory specific option",
+            );
+        }
+    }
+
+    fn set(&mut self, key: &str, value: &str) -> Result<()> {
+        self.options.insert(key.to_owned(), value.to_owned());
+        Ok(())
+    }
+}
+
+impl EncryptionFactoryOptions {
+    /// Convert these encryption factory options to an [`ExtensionOptions`] 
instance.
+    pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> 
Result<T> {
+        let mut options = T::default();
+        for (key, value) in &self.options {
+            options.set(key, value)?;
+        }
+        Ok(options)
+    }
+}
+
 config_namespace! {
     /// Options controlling CSV format
     pub struct CsvOptions {
@@ -2821,6 +2881,36 @@ mod tests {
         );
     }
 
+    #[cfg(feature = "parquet_encryption")]
+    #[test]
+    fn parquet_encryption_factory_config() {
+        let mut parquet_options = 
crate::config::TableParquetOptions::default();
+
+        assert_eq!(parquet_options.crypto.factory_id, None);
+        assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
+
+        let mut input_config = TestExtensionConfig::default();
+        input_config
+            .properties
+            .insert("key1".to_string(), "value 1".to_string());
+        input_config
+            .properties
+            .insert("key2".to_string(), "value 2".to_string());
+
+        parquet_options
+            .crypto
+            .configure_factory("example_factory", &input_config);
+
+        assert_eq!(
+            parquet_options.crypto.factory_id,
+            Some("example_factory".to_string())
+        );
+        let factory_options = &parquet_options.crypto.factory_options.options;
+        assert_eq!(factory_options.len(), 2);
+        assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
+        assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
+    }
+
     #[cfg(feature = "parquet")]
     #[test]
     fn parquet_table_options_config_entry() {
diff --git a/datafusion/common/src/encryption.rs 
b/datafusion/common/src/encryption.rs
index 5d50d4a9ef..5dd603a081 100644
--- a/datafusion/common/src/encryption.rs
+++ b/datafusion/common/src/encryption.rs
@@ -28,24 +28,7 @@ pub struct FileDecryptionProperties;
 #[cfg(not(feature = "parquet_encryption"))]
 pub struct FileEncryptionProperties;
 
-#[cfg(feature = "parquet")]
-use crate::config::ParquetEncryptionOptions;
 pub use crate::config::{ConfigFileDecryptionProperties, 
ConfigFileEncryptionProperties};
-#[cfg(feature = "parquet")]
-use parquet::file::properties::WriterPropertiesBuilder;
-
-#[cfg(feature = "parquet")]
-pub fn add_crypto_to_writer_properties(
-    #[allow(unused)] crypto: &ParquetEncryptionOptions,
-    #[allow(unused_mut)] mut builder: WriterPropertiesBuilder,
-) -> WriterPropertiesBuilder {
-    #[cfg(feature = "parquet_encryption")]
-    if let Some(file_encryption_properties) = &crypto.file_encryption {
-        builder = builder
-            
.with_file_encryption_properties(file_encryption_properties.clone().into());
-    }
-    builder
-}
 
 #[cfg(feature = "parquet_encryption")]
 pub fn map_encryption_to_config_encryption(
@@ -63,14 +46,14 @@ pub fn map_encryption_to_config_encryption(
 
 #[cfg(feature = "parquet_encryption")]
 pub fn map_config_decryption_to_decryption(
-    decryption: Option<&ConfigFileDecryptionProperties>,
-) -> Option<FileDecryptionProperties> {
-    decryption.map(|fd| fd.clone().into())
+    decryption: &ConfigFileDecryptionProperties,
+) -> FileDecryptionProperties {
+    decryption.clone().into()
 }
 
 #[cfg(not(feature = "parquet_encryption"))]
 pub fn map_config_decryption_to_decryption(
-    _decryption: Option<&ConfigFileDecryptionProperties>,
-) -> Option<FileDecryptionProperties> {
-    None
+    _decryption: &ConfigFileDecryptionProperties,
+) -> FileDecryptionProperties {
+    FileDecryptionProperties {}
 }
diff --git a/datafusion/common/src/file_options/parquet_writer.rs 
b/datafusion/common/src/file_options/parquet_writer.rs
index 91683ccb1b..185826aef4 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -27,7 +27,6 @@ use crate::{
 
 use arrow::datatypes::Schema;
 // TODO: handle once deprecated
-use crate::encryption::add_crypto_to_writer_properties;
 #[allow(deprecated)]
 use parquet::{
     arrow::ARROW_SCHEMA_META_KEY,
@@ -90,19 +89,19 @@ impl TryFrom<&TableParquetOptions> for 
WriterPropertiesBuilder {
     /// Convert the session's [`TableParquetOptions`] into a single write 
action's [`WriterPropertiesBuilder`].
     ///
     /// The returned [`WriterPropertiesBuilder`] includes customizations 
applicable per column.
+    /// Note that any encryption options are ignored as building the 
`FileEncryptionProperties`
+    /// might require other inputs besides the [`TableParquetOptions`].
     fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
         // Table options include kv_metadata and col-specific options
         let TableParquetOptions {
             global,
             column_specific_options,
             key_value_metadata,
-            crypto,
+            crypto: _,
         } = table_parquet_options;
 
         let mut builder = global.into_writer_properties_builder()?;
 
-        builder = add_crypto_to_writer_properties(crypto, builder);
-
         // check that the arrow schema is present in the kv_metadata, if 
configured to do so
         if !global.skip_arrow_metadata
             && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
@@ -617,6 +616,8 @@ mod tests {
             crypto: ParquetEncryptionOptions {
                 file_encryption: fep,
                 file_decryption: None,
+                factory_id: None,
+                factory_options: Default::default(),
             },
         }
     }
diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index 3cc177feac..b4b88ba5aa 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -567,6 +567,10 @@ async fn setup_context(
         disk_manager: Arc::new(disk_manager),
         cache_manager: runtime.cache_manager.clone(),
         object_store_registry: runtime.object_store_registry.clone(),
+        #[cfg(feature = "parquet_encryption")]
+        parquet_encryption_factory_registry: runtime
+            .parquet_encryption_factory_registry
+            .clone(),
     });
 
     let config = SessionConfig::new()
diff --git a/datafusion/core/tests/parquet/encryption.rs 
b/datafusion/core/tests/parquet/encryption.rs
index 8e90b9aaa9..a71a4f1ea2 100644
--- a/datafusion/core/tests/parquet/encryption.rs
+++ b/datafusion/core/tests/parquet/encryption.rs
@@ -15,27 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! non trivial integration testing for parquet predicate pushdown
-//!
-//! Testing hints: If you run this test with --nocapture it will tell you where
-//! the generated parquet file went. You can then test it and try out various 
queries
-//! datafusion-cli like:
-//!
-//! ```sql
-//! create external table data stored as parquet location 'data.parquet';
-//! select * from data limit 10;
-//! ```
+//! Tests for reading and writing Parquet files that use Parquet modular 
encryption
 
+use arrow::array::{ArrayRef, Int32Array, StringArray};
 use arrow::record_batch::RecordBatch;
+use arrow_schema::{DataType, SchemaRef};
+use datafusion::dataframe::DataFrameWriteOptions;
+use datafusion::datasource::listing::ListingOptions;
 use datafusion::prelude::{ParquetReadOptions, SessionContext};
-use std::fs::File;
-use std::path::{Path, PathBuf};
-use std::sync::Arc;
-
+use datafusion_common::config::{EncryptionFactoryOptions, TableParquetOptions};
+use datafusion_common::{assert_batches_sorted_eq, DataFusionError};
+use datafusion_datasource_parquet::ParquetFormat;
+use datafusion_execution::parquet_encryption::EncryptionFactory;
+use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
 use parquet::arrow::ArrowWriter;
 use parquet::encryption::decrypt::FileDecryptionProperties;
 use parquet::encryption::encrypt::FileEncryptionProperties;
+use parquet::file::column_crypto_metadata::ColumnCryptoMetaData;
 use parquet::file::properties::WriterProperties;
+use std::collections::HashMap;
+use std::fs::File;
+use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicU8, Ordering};
+use std::sync::{Arc, Mutex};
 use tempfile::TempDir;
 
 async fn read_parquet_test_data<'a, T: Into<String>>(
@@ -74,7 +76,6 @@ pub fn write_batches(
     Ok(num_rows)
 }
 
-#[cfg(feature = "parquet_encryption")]
 #[tokio::test]
 async fn round_trip_encryption() {
     let ctx: SessionContext = SessionContext::new();
@@ -128,3 +129,226 @@ async fn round_trip_encryption() {
 
     assert_eq!(num_rows_written, num_rows_read);
 }
+
+#[tokio::test]
+async fn round_trip_parquet_with_encryption_factory() {
+    let ctx = SessionContext::new();
+    let encryption_factory = Arc::new(MockEncryptionFactory::default());
+    ctx.runtime_env().register_parquet_encryption_factory(
+        "test_encryption_factory",
+        Arc::clone(&encryption_factory) as Arc<dyn EncryptionFactory>,
+    );
+
+    let tmpdir = TempDir::new().unwrap();
+
+    // Register some simple test data
+    let strings: ArrayRef =
+        Arc::new(StringArray::from(vec!["a", "b", "c", "a", "b", "c"]));
+    let x1: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 11, 100, 101, 
111]));
+    let x2: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
+    let batch =
+        RecordBatch::try_from_iter(vec![("string", strings), ("x1", x1), 
("x2", x2)])
+            .unwrap();
+    let test_data_schema = batch.schema();
+    ctx.register_batch("test_data", batch).unwrap();
+    let df = ctx.table("test_data").await.unwrap();
+
+    // Write encrypted Parquet, partitioned by string column into separate 
files
+    let mut parquet_options = TableParquetOptions::new();
+    parquet_options.crypto.factory_id = 
Some("test_encryption_factory".to_string());
+    parquet_options
+        .crypto
+        .factory_options
+        .options
+        .insert("test_key".to_string(), "test value".to_string());
+
+    let df_write_options =
+        
DataFrameWriteOptions::default().with_partition_by(vec!["string".to_string()]);
+    df.write_parquet(
+        tmpdir.path().to_str().unwrap(),
+        df_write_options,
+        Some(parquet_options.clone()),
+    )
+    .await
+    .unwrap();
+
+    // Crypto factory should have generated one key per partition file
+    assert_eq!(encryption_factory.encryption_keys.lock().unwrap().len(), 3);
+
+    verify_table_encrypted(tmpdir.path(), &encryption_factory).unwrap();
+
+    // Registering table without decryption properties should fail
+    let table_path = format!("file://{}/", tmpdir.path().to_str().unwrap());
+    let without_decryption_register = ctx
+        .register_listing_table(
+            "parquet_missing_decryption",
+            &table_path,
+            ListingOptions::new(Arc::new(ParquetFormat::default())),
+            None,
+            None,
+        )
+        .await;
+    assert!(matches!(
+        without_decryption_register.unwrap_err(),
+        DataFusionError::ParquetError(_)
+    ));
+
+    // Registering table succeeds if schema is provided
+    ctx.register_listing_table(
+        "parquet_missing_decryption",
+        &table_path,
+        ListingOptions::new(Arc::new(ParquetFormat::default())),
+        Some(test_data_schema),
+        None,
+    )
+    .await
+    .unwrap();
+
+    // But trying to read from the table should fail
+    let without_decryption_read = ctx
+        .table("parquet_missing_decryption")
+        .await
+        .unwrap()
+        .collect()
+        .await;
+    assert!(matches!(
+        without_decryption_read.unwrap_err(),
+        DataFusionError::ParquetError(_)
+    ));
+
+    // Register table with encryption factory specified
+    let listing_options = ListingOptions::new(Arc::new(
+        ParquetFormat::default().with_options(parquet_options),
+    ))
+    .with_table_partition_cols(vec![("string".to_string(), DataType::Utf8)]);
+    ctx.register_listing_table(
+        "parquet_with_decryption",
+        &table_path,
+        listing_options,
+        None,
+        None,
+    )
+    .await
+    .unwrap();
+
+    // Can read correct data when encryption factory has been specified
+    let table = ctx
+        .table("parquet_with_decryption")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let expected = [
+        "+-----+----+--------+",
+        "| x1  | x2 | string |",
+        "+-----+----+--------+",
+        "| 1   | 1  | a      |",
+        "| 100 | 4  | a      |",
+        "| 10  | 2  | b      |",
+        "| 101 | 5  | b      |",
+        "| 11  | 3  | c      |",
+        "| 111 | 6  | c      |",
+        "+-----+----+--------+",
+    ];
+    assert_batches_sorted_eq!(expected, &table);
+}
+
+fn verify_table_encrypted(
+    table_path: &Path,
+    encryption_factory: &Arc<MockEncryptionFactory>,
+) -> datafusion_common::Result<()> {
+    let mut directories = vec![table_path.to_path_buf()];
+    let mut files_visited = 0;
+    while let Some(directory) = directories.pop() {
+        for entry in std::fs::read_dir(&directory)? {
+            let path = entry?.path();
+            if path.is_dir() {
+                directories.push(path);
+            } else {
+                verify_file_encrypted(&path, encryption_factory)?;
+                files_visited += 1;
+            }
+        }
+    }
+    assert!(files_visited > 0);
+    Ok(())
+}
+
+fn verify_file_encrypted(
+    file_path: &Path,
+    encryption_factory: &Arc<MockEncryptionFactory>,
+) -> datafusion_common::Result<()> {
+    let mut options = EncryptionFactoryOptions::default();
+    options
+        .options
+        .insert("test_key".to_string(), "test value".to_string());
+    let object_path = 
object_store::path::Path::from(file_path.to_str().unwrap());
+    let decryption_properties = encryption_factory
+        .get_file_decryption_properties(&options, &object_path)?
+        .unwrap();
+
+    let reader_options =
+        
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+    let file = File::open(file_path)?;
+    let reader_metadata = ArrowReaderMetadata::load(&file, reader_options)?;
+    let metadata = reader_metadata.metadata();
+    assert!(metadata.num_row_groups() > 0);
+    for row_group in metadata.row_groups() {
+        assert!(row_group.num_columns() > 0);
+        for col in row_group.columns() {
+            assert!(matches!(
+                col.crypto_metadata(),
+                Some(ColumnCryptoMetaData::EncryptionWithFooterKey)
+            ));
+        }
+    }
+    Ok(())
+}
+
+/// Encryption factory implementation for use in tests,
+/// which generates encryption keys in a sequence
+#[derive(Debug, Default)]
+struct MockEncryptionFactory {
+    pub encryption_keys: Mutex<HashMap<object_store::path::Path, Vec<u8>>>,
+    pub counter: AtomicU8,
+}
+
+impl EncryptionFactory for MockEncryptionFactory {
+    fn get_file_encryption_properties(
+        &self,
+        config: &EncryptionFactoryOptions,
+        _schema: &SchemaRef,
+        file_path: &object_store::path::Path,
+    ) -> datafusion_common::Result<Option<FileEncryptionProperties>> {
+        assert_eq!(
+            config.options.get("test_key"),
+            Some(&"test value".to_string())
+        );
+        let file_idx = self.counter.fetch_add(1, Ordering::Relaxed);
+        let key = vec![file_idx, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
+        let mut keys = self.encryption_keys.lock().unwrap();
+        keys.insert(file_path.clone(), key.clone());
+        let encryption_properties = 
FileEncryptionProperties::builder(key).build()?;
+        Ok(Some(encryption_properties))
+    }
+
+    fn get_file_decryption_properties(
+        &self,
+        config: &EncryptionFactoryOptions,
+        file_path: &object_store::path::Path,
+    ) -> datafusion_common::Result<Option<FileDecryptionProperties>> {
+        assert_eq!(
+            config.options.get("test_key"),
+            Some(&"test value".to_string())
+        );
+        let keys = self.encryption_keys.lock().unwrap();
+        let key = keys.get(file_path).ok_or_else(|| {
+            DataFusionError::Execution(format!("No key for file 
{file_path:?}"))
+        })?;
+        let decryption_properties =
+            FileDecryptionProperties::builder(key.clone()).build()?;
+        Ok(Some(decryption_properties))
+    }
+}
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index 82444e8b6e..c44d14abd3 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -43,6 +43,7 @@ use std::sync::Arc;
 use tempfile::NamedTempFile;
 
 mod custom_reader;
+#[cfg(feature = "parquet_encryption")]
 mod encryption;
 mod external_access_plan;
 mod file_statistics;
diff --git a/datafusion/datasource-parquet/Cargo.toml 
b/datafusion/datasource-parquet/Cargo.toml
index 8a75a445c8..6bccd76b60 100644
--- a/datafusion/datasource-parquet/Cargo.toml
+++ b/datafusion/datasource-parquet/Cargo.toml
@@ -71,5 +71,6 @@ path = "src/mod.rs"
 parquet_encryption = [
     "parquet/encryption",
     "datafusion-common/parquet_encryption",
+    "datafusion-execution/parquet_encryption",
     "dep:hex",
 ]
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index 8276a3a8ff..e7c4494685 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -39,9 +39,9 @@ use 
datafusion_datasource::write::demux::DemuxedStreamReceiver;
 use arrow::compute::sum;
 use arrow::datatypes::{DataType, Field, FieldRef};
 use datafusion_common::config::{ConfigField, ConfigFileType, 
TableParquetOptions};
-use datafusion_common::encryption::{
-    map_config_decryption_to_decryption, FileDecryptionProperties,
-};
+#[cfg(feature = "parquet_encryption")]
+use datafusion_common::encryption::map_config_decryption_to_decryption;
+use datafusion_common::encryption::FileDecryptionProperties;
 use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::stats::Precision;
 use datafusion_common::{
@@ -68,6 +68,7 @@ use crate::source::{parse_coerce_int96_string, ParquetSource};
 use async_trait::async_trait;
 use bytes::Bytes;
 use datafusion_datasource::source::DataSourceExec;
+use datafusion_execution::runtime_env::RuntimeEnv;
 use futures::future::BoxFuture;
 use futures::{FutureExt, StreamExt, TryStreamExt};
 use log::debug;
@@ -305,24 +306,60 @@ fn clear_metadata(
 }
 
 async fn fetch_schema_with_location(
+    state: &dyn Session,
     store: &dyn ObjectStore,
+    options: &TableParquetOptions,
     file: &ObjectMeta,
     metadata_size_hint: Option<usize>,
-    file_decryption_properties: Option<&FileDecryptionProperties>,
     coerce_int96: Option<TimeUnit>,
 ) -> Result<(Path, Schema)> {
+    let file_decryption_properties =
+        get_file_decryption_properties(state, options, &file.location)?;
     let loc_path = file.location.clone();
     let schema = fetch_schema(
         store,
         file,
         metadata_size_hint,
-        file_decryption_properties,
+        file_decryption_properties.as_ref(),
         coerce_int96,
     )
     .await?;
     Ok((loc_path, schema))
 }
 
+#[cfg(feature = "parquet_encryption")]
+fn get_file_decryption_properties(
+    state: &dyn Session,
+    options: &TableParquetOptions,
+    file_path: &Path,
+) -> Result<Option<FileDecryptionProperties>> {
+    let file_decryption_properties: Option<FileDecryptionProperties> =
+        match &options.crypto.file_decryption {
+            Some(cfd) => Some(map_config_decryption_to_decryption(cfd)),
+            None => match &options.crypto.factory_id {
+                Some(factory_id) => {
+                    let factory =
+                        
state.runtime_env().parquet_encryption_factory(factory_id)?;
+                    factory.get_file_decryption_properties(
+                        &options.crypto.factory_options,
+                        file_path,
+                    )?
+                }
+                None => None,
+            },
+        };
+    Ok(file_decryption_properties)
+}
+
+#[cfg(not(feature = "parquet_encryption"))]
+fn get_file_decryption_properties(
+    _state: &dyn Session,
+    _options: &TableParquetOptions,
+    _file_path: &Path,
+) -> Result<Option<FileDecryptionProperties>> {
+    Ok(None)
+}
+
 #[async_trait]
 impl FileFormat for ParquetFormat {
     fn as_any(&self) -> &dyn Any {
@@ -358,18 +395,15 @@ impl FileFormat for ParquetFormat {
             Some(time_unit) => 
Some(parse_coerce_int96_string(time_unit.as_str())?),
             None => None,
         };
-        let file_decryption_properties: Option<FileDecryptionProperties> =
-            map_config_decryption_to_decryption(
-                self.options.crypto.file_decryption.as_ref(),
-            );
 
         let mut schemas: Vec<_> = futures::stream::iter(objects)
             .map(|object| {
                 fetch_schema_with_location(
+                    state,
                     store.as_ref(),
+                    &self.options,
                     object,
                     self.metadata_size_hint(),
-                    file_decryption_properties.as_ref(),
                     coerce_int96,
                 )
             })
@@ -414,15 +448,13 @@ impl FileFormat for ParquetFormat {
 
     async fn infer_stats(
         &self,
-        _state: &dyn Session,
+        state: &dyn Session,
         store: &Arc<dyn ObjectStore>,
         table_schema: SchemaRef,
         object: &ObjectMeta,
     ) -> Result<Statistics> {
-        let file_decryption_properties: Option<FileDecryptionProperties> =
-            map_config_decryption_to_decryption(
-                self.options.crypto.file_decryption.as_ref(),
-            );
+        let file_decryption_properties =
+            get_file_decryption_properties(state, &self.options, 
&object.location)?;
         let stats = fetch_statistics(
             store.as_ref(),
             table_schema,
@@ -459,6 +491,9 @@ impl FileFormat for ParquetFormat {
         if let Some(metadata_size_hint) = metadata_size_hint {
             source = source.with_metadata_size_hint(metadata_size_hint)
         }
+
+        source = self.set_source_encryption_factory(source, state)?;
+
         // Apply schema adapter factory before building the new config
         let file_source = source.apply_schema_adapter(&conf)?;
 
@@ -489,6 +524,41 @@ impl FileFormat for ParquetFormat {
     }
 }
 
+#[cfg(feature = "parquet_encryption")]
+impl ParquetFormat {
+    fn set_source_encryption_factory(
+        &self,
+        source: ParquetSource,
+        state: &dyn Session,
+    ) -> Result<ParquetSource> {
+        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
+            Ok(source.with_encryption_factory(
+                state
+                    .runtime_env()
+                    .parquet_encryption_factory(encryption_factory_id)?,
+            ))
+        } else {
+            Ok(source)
+        }
+    }
+}
+
+#[cfg(not(feature = "parquet_encryption"))]
+impl ParquetFormat {
+    fn set_source_encryption_factory(
+        &self,
+        source: ParquetSource,
+        _state: &dyn Session,
+    ) -> Result<ParquetSource> {
+        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
+            Err(DataFusionError::Configuration(
+                format!("Parquet encryption factory id is set to 
'{encryption_factory_id}' but the parquet_encryption feature is disabled")))
+        } else {
+            Ok(source)
+        }
+    }
+}
+
 /// Apply necessary schema type coercions to make file schema match table 
schema.
 ///
 /// This function performs two main types of transformations in a single pass:
@@ -1243,7 +1313,11 @@ impl ParquetSink {
 
     /// Create writer properties based upon configuration settings,
     /// including partitioning and the inclusion of arrow schema metadata.
-    fn create_writer_props(&self) -> Result<WriterProperties> {
+    fn create_writer_props(
+        &self,
+        runtime: &Arc<RuntimeEnv>,
+        path: &Path,
+    ) -> Result<WriterProperties> {
         let schema = if 
self.parquet_options.global.allow_single_file_parallelism {
             // If parallelizing writes, we may be also be doing hive style 
partitioning
             // into multiple files which impacts the schema per file.
@@ -1260,7 +1334,15 @@ impl ParquetSink {
             parquet_opts.arrow_schema(schema);
         }
 
-        Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build())
+        let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
+        builder = set_writer_encryption_properties(
+            builder,
+            runtime,
+            &parquet_opts,
+            schema,
+            path,
+        )?;
+        Ok(builder.build())
     }
 
     /// Creates an AsyncArrowWriter which serializes a parquet file to an 
ObjectStore
@@ -1299,6 +1381,48 @@ impl ParquetSink {
     }
 }
 
+#[cfg(feature = "parquet_encryption")]
+fn set_writer_encryption_properties(
+    builder: WriterPropertiesBuilder,
+    runtime: &Arc<RuntimeEnv>,
+    parquet_opts: &TableParquetOptions,
+    schema: &Arc<Schema>,
+    path: &Path,
+) -> Result<WriterPropertiesBuilder> {
+    if let Some(file_encryption_properties) = 
&parquet_opts.crypto.file_encryption {
+        // Encryption properties have been specified directly
+        return Ok(builder
+            
.with_file_encryption_properties(file_encryption_properties.clone().into()));
+    } else if let Some(encryption_factory_id) = 
&parquet_opts.crypto.factory_id.as_ref() {
+        // Encryption properties will be generated by an encryption factory
+        let encryption_factory =
+            runtime.parquet_encryption_factory(encryption_factory_id)?;
+        let file_encryption_properties = encryption_factory
+            .get_file_encryption_properties(
+                &parquet_opts.crypto.factory_options,
+                schema,
+                path,
+            )?;
+        if let Some(file_encryption_properties) = file_encryption_properties {
+            return Ok(
+                
builder.with_file_encryption_properties(file_encryption_properties)
+            );
+        }
+    }
+    Ok(builder)
+}
+
+#[cfg(not(feature = "parquet_encryption"))]
+fn set_writer_encryption_properties(
+    builder: WriterPropertiesBuilder,
+    _runtime: &Arc<RuntimeEnv>,
+    _parquet_opts: &TableParquetOptions,
+    _schema: &Arc<Schema>,
+    _path: &Path,
+) -> Result<WriterPropertiesBuilder> {
+    Ok(builder)
+}
+
 #[async_trait]
 impl FileSink for ParquetSink {
     fn config(&self) -> &FileSinkConfig {
@@ -1316,7 +1440,9 @@ impl FileSink for ParquetSink {
         let mut allow_single_file_parallelism =
             parquet_opts.global.allow_single_file_parallelism;
 
-        if parquet_opts.crypto.file_encryption.is_some() {
+        if parquet_opts.crypto.file_encryption.is_some()
+            || parquet_opts.crypto.factory_id.is_some()
+        {
             // For now, arrow-rs does not support parallel writes with 
encryption
             // See https://github.com/apache/arrow-rs/issues/7359
             allow_single_file_parallelism = false;
@@ -1326,7 +1452,7 @@ impl FileSink for ParquetSink {
             std::result::Result<(Path, FileMetaData), DataFusionError>,
         > = JoinSet::new();
 
-        let parquet_props = self.create_writer_props()?;
+        let runtime = context.runtime_env();
         let parallel_options = ParallelParquetWriterOptions {
             max_parallel_row_groups: parquet_opts
                 .global
@@ -1337,6 +1463,7 @@ impl FileSink for ParquetSink {
         };
 
         while let Some((path, mut rx)) = file_stream_rx.recv().await {
+            let parquet_props = self.create_writer_props(&runtime, &path)?;
             if !allow_single_file_parallelism {
                 let mut writer = self
                     .create_async_arrow_writer(
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 7c208d1426..62dc0fccc2 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -43,6 +43,10 @@ use datafusion_physical_expr_common::physical_expr::{
 use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, 
MetricBuilder};
 use datafusion_pruning::{build_pruning_predicate, FilePruner, 
PruningPredicate};
 
+#[cfg(feature = "parquet_encryption")]
+use datafusion_common::config::EncryptionFactoryOptions;
+#[cfg(feature = "parquet_encryption")]
+use datafusion_execution::parquet_encryption::EncryptionFactory;
 use futures::{StreamExt, TryStreamExt};
 use itertools::Itertools;
 use log::debug;
@@ -96,13 +100,18 @@ pub(super) struct ParquetOpener {
     pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
     /// Rewrite expressions in the context of the file schema
     pub(crate) expr_adapter_factory: Option<Arc<dyn 
PhysicalExprAdapterFactory>>,
+    /// Optional factory to create file decryption properties dynamically
+    #[cfg(feature = "parquet_encryption")]
+    pub encryption_factory:
+        Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
 }
 
 impl FileOpener for ParquetOpener {
     fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> 
Result<FileOpenFuture> {
         let file_range = file_meta.range.clone();
         let extensions = file_meta.extensions.clone();
-        let file_name = file_meta.location().to_string();
+        let file_location = file_meta.location();
+        let file_name = file_location.to_string();
         let file_metrics =
             ParquetFileMetrics::new(self.partition_index, &file_name, 
&self.metrics);
 
@@ -141,7 +150,8 @@ impl FileOpener for ParquetOpener {
         let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);
 
         let mut enable_page_index = self.enable_page_index;
-        let file_decryption_properties = 
self.file_decryption_properties.clone();
+        let file_decryption_properties =
+            self.get_file_decryption_properties(file_location)?;
 
         // For now, page index does not work with encrypted files. See:
         // https://github.com/apache/arrow-rs/issues/7629
@@ -411,6 +421,36 @@ impl FileOpener for ParquetOpener {
     }
 }
 
+#[cfg(feature = "parquet_encryption")]
+impl ParquetOpener {
+    fn get_file_decryption_properties(
+        &self,
+        file_location: &object_store::path::Path,
+    ) -> Result<Option<Arc<FileDecryptionProperties>>> {
+        match &self.file_decryption_properties {
+            Some(file_decryption_properties) => {
+                Ok(Some(Arc::clone(file_decryption_properties)))
+            }
+            None => match &self.encryption_factory {
+                Some((encryption_factory, encryption_config)) => 
Ok(encryption_factory
+                    .get_file_decryption_properties(encryption_config, 
file_location)?
+                    .map(Arc::new)),
+                None => Ok(None),
+            },
+        }
+    }
+}
+
+#[cfg(not(feature = "parquet_encryption"))]
+impl ParquetOpener {
+    fn get_file_decryption_properties(
+        &self,
+        _file_location: &object_store::path::Path,
+    ) -> Result<Option<Arc<FileDecryptionProperties>>> {
+        Ok(self.file_decryption_properties.clone())
+    }
+}
+
 /// Return the initial [`ParquetAccessPlan`]
 ///
 /// If the user has supplied one as an extension, use that
@@ -672,6 +712,8 @@ mod test {
                 coerce_int96: None,
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+                #[cfg(feature = "parquet_encryption")]
+                encryption_factory: None,
             }
         };
 
@@ -758,6 +800,8 @@ mod test {
                 coerce_int96: None,
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+                #[cfg(feature = "parquet_encryption")]
+                encryption_factory: None,
             }
         };
 
@@ -860,6 +904,8 @@ mod test {
                 coerce_int96: None,
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+                #[cfg(feature = "parquet_encryption")]
+                encryption_factory: None,
             }
         };
         let make_meta = || FileMeta {
@@ -972,6 +1018,8 @@ mod test {
                 coerce_int96: None,
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+                #[cfg(feature = "parquet_encryption")]
+                encryption_factory: None,
             }
         };
 
@@ -1085,6 +1133,8 @@ mod test {
                 coerce_int96: None,
                 file_decryption_properties: None,
                 expr_adapter_factory: 
Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+                #[cfg(feature = "parquet_encryption")]
+                encryption_factory: None,
             }
         };
 
@@ -1265,6 +1315,8 @@ mod test {
             coerce_int96: None,
             file_decryption_properties: None,
             expr_adapter_factory: None,
+            #[cfg(feature = "parquet_encryption")]
+            encryption_factory: None,
         };
 
         let predicate = logical2physical(&col("a").eq(lit(1u64)), 
&table_schema);
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 430cb5ce54..366d42700f 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -27,6 +27,8 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas;
 use crate::DefaultParquetFileReaderFactory;
 use crate::ParquetFileReaderFactory;
 use datafusion_common::config::ConfigOptions;
+#[cfg(feature = "parquet_encryption")]
+use datafusion_common::config::EncryptionFactoryOptions;
 use datafusion_datasource::as_file_source;
 use datafusion_datasource::file_stream::FileOpener;
 use datafusion_datasource::schema_adapter::{
@@ -51,6 +53,8 @@ use 
datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
 use datafusion_common::encryption::map_config_decryption_to_decryption;
+#[cfg(feature = "parquet_encryption")]
+use datafusion_execution::parquet_encryption::EncryptionFactory;
 use itertools::Itertools;
 use object_store::ObjectStore;
 
@@ -282,6 +286,8 @@ pub struct ParquetSource {
     /// Optional hint for the size of the parquet metadata
     pub(crate) metadata_size_hint: Option<usize>,
     pub(crate) projected_statistics: Option<Statistics>,
+    #[cfg(feature = "parquet_encryption")]
+    pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
 }
 
 impl ParquetSource {
@@ -320,6 +326,16 @@ impl ParquetSource {
         conf
     }
 
+    /// Set the encryption factory to use to generate file decryption 
properties
+    #[cfg(feature = "parquet_encryption")]
+    pub fn with_encryption_factory(
+        mut self,
+        encryption_factory: Arc<dyn EncryptionFactory>,
+    ) -> Self {
+        self.encryption_factory = Some(encryption_factory);
+        self
+    }
+
     /// Options passed to the parquet reader for this scan
     pub fn table_parquet_options(&self) -> &TableParquetOptions {
         &self.table_parquet_options
@@ -431,6 +447,19 @@ impl ParquetSource {
             Ok(file_source)
         }
     }
+
+    #[cfg(feature = "parquet_encryption")]
+    fn get_encryption_factory_with_config(
+        &self,
+    ) -> Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)> {
+        match &self.encryption_factory {
+            None => None,
+            Some(factory) => Some((
+                Arc::clone(factory),
+                self.table_parquet_options.crypto.factory_options.clone(),
+            )),
+        }
+    }
 }
 
 /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a 
arrow_schema.datatype.TimeUnit
@@ -512,10 +541,13 @@ impl FileSource for ParquetSource {
                 Arc::new(DefaultParquetFileReaderFactory::new(object_store)) 
as _
             });
 
-        let file_decryption_properties = map_config_decryption_to_decryption(
-            self.table_parquet_options().crypto.file_decryption.as_ref(),
-        )
-        .map(Arc::new);
+        let file_decryption_properties = self
+            .table_parquet_options()
+            .crypto
+            .file_decryption
+            .as_ref()
+            .map(map_config_decryption_to_decryption)
+            .map(Arc::new);
 
         let coerce_int96 = self
             .table_parquet_options
@@ -546,6 +578,8 @@ impl FileSource for ParquetSource {
             coerce_int96,
             file_decryption_properties,
             expr_adapter_factory,
+            #[cfg(feature = "parquet_encryption")]
+            encryption_factory: self.get_encryption_factory_with_config(),
         })
     }
 
diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml
index 5988d3a336..f6d02615e3 100644
--- a/datafusion/execution/Cargo.toml
+++ b/datafusion/execution/Cargo.toml
@@ -37,6 +37,11 @@ workspace = true
 [lib]
 name = "datafusion_execution"
 
+[features]
+parquet_encryption = [
+    "parquet/encryption",
+]
+
 [dependencies]
 arrow = { workspace = true }
 dashmap = { workspace = true }
@@ -46,6 +51,7 @@ futures = { workspace = true }
 log = { workspace = true }
 object_store = { workspace = true, features = ["fs"] }
 parking_lot = { workspace = true }
+parquet = { workspace = true, optional = true }
 rand = { workspace = true }
 tempfile = { workspace = true }
 url = { workspace = true }
diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs
index 6a0a4b6322..e971e838a6 100644
--- a/datafusion/execution/src/lib.rs
+++ b/datafusion/execution/src/lib.rs
@@ -31,6 +31,8 @@ pub mod config;
 pub mod disk_manager;
 pub mod memory_pool;
 pub mod object_store;
+#[cfg(feature = "parquet_encryption")]
+pub mod parquet_encryption;
 pub mod runtime_env;
 mod stream;
 mod task;
diff --git a/datafusion/execution/src/parquet_encryption.rs 
b/datafusion/execution/src/parquet_encryption.rs
new file mode 100644
index 0000000000..13a18390d0
--- /dev/null
+++ b/datafusion/execution/src/parquet_encryption.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::SchemaRef;
+use dashmap::DashMap;
+use datafusion_common::config::EncryptionFactoryOptions;
+use datafusion_common::error::Result;
+use datafusion_common::DataFusionError;
+use object_store::path::Path;
+use parquet::encryption::decrypt::FileDecryptionProperties;
+use parquet::encryption::encrypt::FileEncryptionProperties;
+use std::sync::Arc;
+
+/// Trait for types that generate file encryption and decryption properties to
+/// write and read encrypted Parquet files.
+/// This allows flexibility in how encryption keys are managed, for example, to
+/// integrate with a user's key management service (KMS).
+/// For example usage, see the [`parquet_encrypted_with_kms` example].
+///
+/// [`parquet_encrypted_with_kms` example]: 
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_encrypted_with_kms.rs
+pub trait EncryptionFactory: Send + Sync + std::fmt::Debug + 'static {
+    /// Generate file encryption properties to use when writing a Parquet file.
+    fn get_file_encryption_properties(
+        &self,
+        config: &EncryptionFactoryOptions,
+        schema: &SchemaRef,
+        file_path: &Path,
+    ) -> Result<Option<FileEncryptionProperties>>;
+
+    /// Generate file decryption properties to use when reading a Parquet file.
+    fn get_file_decryption_properties(
+        &self,
+        config: &EncryptionFactoryOptions,
+        file_path: &Path,
+    ) -> Result<Option<FileDecryptionProperties>>;
+}
+
+/// Stores [`EncryptionFactory`] implementations that can be retrieved by a 
unique string identifier
+#[derive(Clone, Debug, Default)]
+pub struct EncryptionFactoryRegistry {
+    factories: DashMap<String, Arc<dyn EncryptionFactory>>,
+}
+
+impl EncryptionFactoryRegistry {
+    /// Register an [`EncryptionFactory`] with an associated identifier that 
can be later
+    /// used to configure encryption when reading or writing Parquet.
+    /// If an encryption factory with the same identifier was already 
registered, it is replaced and returned.
+    pub fn register_factory(
+        &self,
+        id: &str,
+        factory: Arc<dyn EncryptionFactory>,
+    ) -> Option<Arc<dyn EncryptionFactory>> {
+        self.factories.insert(id.to_owned(), factory)
+    }
+
+    /// Retrieve an [`EncryptionFactory`] by its identifier
+    pub fn get_factory(&self, id: &str) -> Result<Arc<dyn EncryptionFactory>> {
+        self.factories
+            .get(id)
+            .map(|f| Arc::clone(f.value()))
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "No Parquet encryption factory found for id '{id}'"
+                ))
+            })
+    }
+}
diff --git a/datafusion/execution/src/runtime_env.rs 
b/datafusion/execution/src/runtime_env.rs
index 9115665dce..9c5de42bcd 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -29,6 +29,8 @@ use crate::{
 };
 
 use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
+#[cfg(feature = "parquet_encryption")]
+use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
 use datafusion_common::{config::ConfigEntry, Result};
 use object_store::ObjectStore;
 use std::path::PathBuf;
@@ -78,6 +80,9 @@ pub struct RuntimeEnv {
     pub cache_manager: Arc<CacheManager>,
     /// Object Store Registry
     pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
+    /// Parquet encryption factory registry
+    #[cfg(feature = "parquet_encryption")]
+    pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
 }
 
 impl Debug for RuntimeEnv {
@@ -142,6 +147,28 @@ impl RuntimeEnv {
     pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
         self.object_store_registry.get_store(url.as_ref())
     }
+
+    /// Register an [`EncryptionFactory`] with an associated identifier that 
can be later
+    /// used to configure encryption when reading or writing Parquet.
+    /// If an encryption factory with the same identifier was already 
registered, it is replaced and returned.
+    #[cfg(feature = "parquet_encryption")]
+    pub fn register_parquet_encryption_factory(
+        &self,
+        id: &str,
+        encryption_factory: Arc<dyn EncryptionFactory>,
+    ) -> Option<Arc<dyn EncryptionFactory>> {
+        self.parquet_encryption_factory_registry
+            .register_factory(id, encryption_factory)
+    }
+
+    /// Retrieve an [`EncryptionFactory`] by its identifier
+    #[cfg(feature = "parquet_encryption")]
+    pub fn parquet_encryption_factory(
+        &self,
+        id: &str,
+    ) -> Result<Arc<dyn EncryptionFactory>> {
+        self.parquet_encryption_factory_registry.get_factory(id)
+    }
 }
 
 impl Default for RuntimeEnv {
@@ -168,6 +195,9 @@ pub struct RuntimeEnvBuilder {
     pub cache_manager: CacheManagerConfig,
     /// ObjectStoreRegistry to get object store based on url
     pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
+    /// Parquet encryption factory registry
+    #[cfg(feature = "parquet_encryption")]
+    pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
 }
 
 impl Default for RuntimeEnvBuilder {
@@ -185,6 +215,8 @@ impl RuntimeEnvBuilder {
             memory_pool: Default::default(),
             cache_manager: Default::default(),
             object_store_registry: 
Arc::new(DefaultObjectStoreRegistry::default()),
+            #[cfg(feature = "parquet_encryption")]
+            parquet_encryption_factory_registry: Default::default(),
         }
     }
 
@@ -265,6 +297,8 @@ impl RuntimeEnvBuilder {
             memory_pool,
             cache_manager,
             object_store_registry,
+            #[cfg(feature = "parquet_encryption")]
+            parquet_encryption_factory_registry,
         } = self;
         let memory_pool =
             memory_pool.unwrap_or_else(|| 
Arc::new(UnboundedMemoryPool::default()));
@@ -279,6 +313,8 @@ impl RuntimeEnvBuilder {
             },
             cache_manager: CacheManager::try_new(&cache_manager)?,
             object_store_registry,
+            #[cfg(feature = "parquet_encryption")]
+            parquet_encryption_factory_registry,
         })
     }
 
@@ -309,6 +345,10 @@ impl RuntimeEnvBuilder {
             memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
             cache_manager: cache_config,
             object_store_registry: 
Arc::clone(&runtime_env.object_store_registry),
+            #[cfg(feature = "parquet_encryption")]
+            parquet_encryption_factory_registry: Arc::clone(
+                &runtime_env.parquet_encryption_factory_registry,
+            ),
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to