This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d66d6b9eb8 feat: Parquet modular encryption (#16351)
d66d6b9eb8 is described below

commit d66d6b9eb83a4643bc8d1ee9c2da7f0fa644c0cf
Author: Corwin Joy <corwin....@gmail.com>
AuthorDate: Sat Jun 28 06:47:46 2025 -0700

    feat: Parquet modular encryption (#16351)
    
    * Initial commit to form PR for datafusion encryption support
    
    * Add tests for encryption configuration
    
    * Apply cargo fmt
    
    * Add a roundtrip encryption test to the parquet tests.
    
    * cargo fmt
    
    * Update test to add decryption parameter to called functions.
    
    * Try to get DataFrame.write_parquet to work with encryption. Doesn't 
quite, column encryption is broken.
    
    * Update datafusion/datasource-parquet/src/opener.rs
    
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    
    * Update datafusion/datasource-parquet/src/source.rs
    
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    
    * Fix write test in parquet.rs
    
    * Simplify encryption test. Remove unused imports.
    
    * Run cargo fmt.
    
    * Further streamline roundtrip test.
    
    * Change From methods for FileEncryptionProperties and 
FileDecryptionProperties to use references.
    
    * Change encryption config to directly hold column keys using custom config 
fields.
    
    * Fix generated field names in visit for encryptor and decryptor to use "." 
instead of "::"
    
    * 1. Disable parallel writes with enccryption.
    2. Fixed unused header warning in config.rs.
    3. Fix test case in encryption.rs to call conversion to 
ConfigFileDecryption properties correctly.
    
    * cargo fmt
    
    * Update datafusion/common/src/file_options/parquet_writer.rs
    
    Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com>
    
    * fix variables shown in information schema test.
    
    * Backout bad suggestion from copilot
    
    * Remove unused serde reference
    Add an example to read and write encrypted parquet files.
    
    * cargo fmt
    
    * change file_format.rs to use global encryption options in struct.
    
    * Turn off page_index for encrypted example. Get encrypted example working 
with filter.
    
    * Tidy up example output.
    
    * Add missing license. Run taplo format
    
    * Update configs.md by running dev/update_config_docs.sh
    
    * Cargo fmt + clippy changes.
    
    * Add filter test for encrypted files.
    
    * Cargo clippy changes.
    
    * Fix link in README.md
    
    * Add issue tag for parallel writes.
    
    * Move file encryption and decryption properties out of global options
    
    * Use config_namespace_with_hashmap for column encryption/decryption props
    
    * Remove outdated docs on crypto settings.
    
    Signed-off-by: Corwin Joy <corwin....@gmail.com>
    
    * 1. Add docs for using encryption configuration.
    2. Add example SQL for using encryption from CLI.
    3. Fix removed variables in test for configuration information.
    4. Clippy and cargo fmt.
    
    Signed-off-by: Corwin Joy <corwin....@gmail.com>
    
    * Update code to add missing ParquetOpener parameter due to merge from main
    
    Signed-off-by: Corwin Joy <corwin....@gmail.com>
    
    * Add CLI documentation for Parquet options and provide an encryption 
example
    
    Signed-off-by: Corwin Joy <corwin....@gmail.com>
    
    * Use ConfigFileDecryptionProperties in ParquetReadOptions
    
    Signed-off-by: Adam Reeve <adam.re...@gr-oss.io>
    
    * Implement default for ConfigFileEncryptionProperties
    
    Signed-off-by: Corwin Joy <corwin....@gmail.com>
    
    * Add sqllogictest for parquet with encryption
    
    Signed-off-by: Corwin Joy <corwin....@gmail.com>
    
    * Apply prettier changes from CI
    
    Signed-off-by: Corwin Joy <corwin....@gmail.com>
    
    * logical conflift
    
    * fix another logical conflict
    
    ---------
    
    Signed-off-by: Corwin Joy <corwin....@gmail.com>
    Signed-off-by: Adam Reeve <adam.re...@gr-oss.io>
    Co-authored-by: Adam Reeve <adre...@gmail.com>
    Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com>
    Co-authored-by: Adam Reeve <adam.re...@gr-oss.io>
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 Cargo.lock                                         |   2 +
 Cargo.toml                                         |   1 +
 benchmarks/src/bin/dfbench.rs                      |   4 +-
 benchmarks/src/bin/imdb.rs                         |   2 +-
 benchmarks/src/bin/tpch.rs                         |   2 +-
 datafusion-examples/README.md                      |   1 +
 datafusion-examples/examples/parquet_encrypted.rs  | 119 +++++
 datafusion/common/Cargo.toml                       |   1 +
 datafusion/common/src/config.rs                    | 487 ++++++++++++++++++++-
 .../common/src/file_options/parquet_writer.rs      |  20 +-
 datafusion/core/src/dataframe/parquet.rs           |  68 +++
 .../core/src/datasource/file_format/options.rs     |  20 +-
 .../core/src/datasource/file_format/parquet.rs     |  42 +-
 datafusion/core/tests/parquet/custom_reader.rs     |   1 +
 datafusion/core/tests/parquet/encryption.rs        | 129 ++++++
 datafusion/core/tests/parquet/mod.rs               |   1 +
 datafusion/datasource-parquet/src/file_format.rs   |  55 ++-
 datafusion/datasource-parquet/src/opener.rs        |  26 +-
 datafusion/datasource-parquet/src/source.rs        |   8 +
 datafusion/proto-common/src/from_proto/mod.rs      |   1 +
 datafusion/proto/src/logical_plan/file_formats.rs  |   1 +
 .../sqllogictest/test_files/encrypted_parquet.slt  |  89 ++++
 docs/source/user-guide/cli/datasources.md          |  41 ++
 23 files changed, 1088 insertions(+), 33 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 03003e088d..6a4652af66 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1997,6 +1997,7 @@ dependencies = [
  "chrono",
  "half",
  "hashbrown 0.14.5",
+ "hex",
  "indexmap 2.10.0",
  "insta",
  "libc",
@@ -4484,6 +4485,7 @@ dependencies = [
  "num-bigint",
  "object_store",
  "paste",
+ "ring",
  "seq-macro",
  "simdutf8",
  "snap",
diff --git a/Cargo.toml b/Cargo.toml
index 19f44a6836..6c8c50d571 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -159,6 +159,7 @@ parquet = { version = "55.1.0", default-features = false, 
features = [
     "arrow",
     "async",
     "object_store",
+    "encryption",
 ] }
 pbjson = { version = "0.7.0" }
 pbjson-types = "0.7"
diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs
index 06337cb758..41b64063c0 100644
--- a/benchmarks/src/bin/dfbench.rs
+++ b/benchmarks/src/bin/dfbench.rs
@@ -60,11 +60,11 @@ pub async fn main() -> Result<()> {
         Options::Cancellation(opt) => opt.run().await,
         Options::Clickbench(opt) => opt.run().await,
         Options::H2o(opt) => opt.run().await,
-        Options::Imdb(opt) => opt.run().await,
+        Options::Imdb(opt) => Box::pin(opt.run()).await,
         Options::ParquetFilter(opt) => opt.run().await,
         Options::Sort(opt) => opt.run().await,
         Options::SortTpch(opt) => opt.run().await,
-        Options::Tpch(opt) => opt.run().await,
+        Options::Tpch(opt) => Box::pin(opt.run()).await,
         Options::TpchConvert(opt) => opt.run().await,
     }
 }
diff --git a/benchmarks/src/bin/imdb.rs b/benchmarks/src/bin/imdb.rs
index 13421f8a89..5ce99928df 100644
--- a/benchmarks/src/bin/imdb.rs
+++ b/benchmarks/src/bin/imdb.rs
@@ -53,7 +53,7 @@ pub async fn main() -> Result<()> {
     env_logger::init();
     match ImdbOpt::from_args() {
         ImdbOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) 
=> {
-            opt.run().await
+            Box::pin(opt.run()).await
         }
         ImdbOpt::Convert(opt) => opt.run().await,
     }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 3270b082cf..ca2bb8e57c 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -58,7 +58,7 @@ async fn main() -> Result<()> {
     env_logger::init();
     match TpchOpt::from_args() {
         TpchOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) 
=> {
-            opt.run().await
+            Box::pin(opt.run()).await
         }
         TpchOpt::Convert(opt) => opt.run().await,
     }
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 3ba4c77cd8..285762bb57 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -65,6 +65,7 @@ cargo run --example dataframe
 - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run 
DataFusion as a standalone process and execute SQL queries from JDBC clients
 - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE 
FUNCTION` handler to implement SQL macros
 - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom 
OptimizerRule to replace certain predicates
+- [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write 
encrypted Parquet files using DataFusion
 - [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index 
over several parquet files and use it to speed up queries
 - [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract 
statistics by visiting an ExecutionPlan after execution
 - [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into 
DataFusion `Expr`.
diff --git a/datafusion-examples/examples/parquet_encrypted.rs 
b/datafusion-examples/examples/parquet_encrypted.rs
new file mode 100644
index 0000000000..e9e239b7a1
--- /dev/null
+++ b/datafusion-examples/examples/parquet_encrypted.rs
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::common::DataFusionError;
+use datafusion::config::TableParquetOptions;
+use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
+use datafusion::logical_expr::{col, lit};
+use datafusion::parquet::encryption::decrypt::FileDecryptionProperties;
+use datafusion::parquet::encryption::encrypt::FileEncryptionProperties;
+use datafusion::prelude::{ParquetReadOptions, SessionContext};
+use tempfile::TempDir;
+
+#[tokio::main]
+async fn main() -> datafusion::common::Result<()> {
+    // The SessionContext is the main high level API for interacting with 
DataFusion
+    let ctx = SessionContext::new();
+
+    // Find the local path of "alltypes_plain.parquet"
+    let testdata = datafusion::test_util::parquet_test_data();
+    let filename = &format!("{testdata}/alltypes_plain.parquet");
+
+    // Read the sample parquet file
+    let parquet_df = ctx
+        .read_parquet(filename, ParquetReadOptions::default())
+        .await?;
+
+    // Show information from the dataframe
+    println!(
+        
"==============================================================================="
+    );
+    println!("Original Parquet DataFrame:");
+    query_dataframe(&parquet_df).await?;
+
+    // Setup encryption and decryption properties
+    let (encrypt, decrypt) = setup_encryption(&parquet_df)?;
+
+    // Create a temporary file location for the encrypted parquet file
+    let tmp_dir = TempDir::new()?;
+    let tempfile = tmp_dir.path().join("alltypes_plain-encrypted.parquet");
+    let tempfile_str = tempfile.into_os_string().into_string().unwrap();
+
+    // Write encrypted parquet
+    let mut options = TableParquetOptions::default();
+    options.crypto.file_encryption = Some((&encrypt).into());
+    parquet_df
+        .write_parquet(
+            tempfile_str.as_str(),
+            DataFrameWriteOptions::new().with_single_file_output(true),
+            Some(options),
+        )
+        .await?;
+
+    // Read encrypted parquet
+    let ctx: SessionContext = SessionContext::new();
+    let read_options =
+        
ParquetReadOptions::default().file_decryption_properties((&decrypt).into());
+
+    let encrypted_parquet_df = ctx.read_parquet(tempfile_str, 
read_options).await?;
+
+    // Show information from the dataframe
+    
println!("\n\n===============================================================================");
+    println!("Encrypted Parquet DataFrame:");
+    query_dataframe(&encrypted_parquet_df).await?;
+
+    Ok(())
+}
+
+// Show information from the dataframe
+async fn query_dataframe(df: &DataFrame) -> Result<(), DataFusionError> {
+    // show its schema using 'describe'
+    println!("Schema:");
+    df.clone().describe().await?.show().await?;
+
+    // Select three columns and filter the results
+    // so that only rows where id > 1 are returned
+    println!("\nSelected rows and columns:");
+    df.clone()
+        .select_columns(&["id", "bool_col", "timestamp_col"])?
+        .filter(col("id").gt(lit(5)))?
+        .show()
+        .await?;
+
+    Ok(())
+}
+
+// Setup encryption and decryption properties
+fn setup_encryption(
+    parquet_df: &DataFrame,
+) -> Result<(FileEncryptionProperties, FileDecryptionProperties), 
DataFusionError> {
+    let schema = parquet_df.schema();
+    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+    let column_key = b"1234567890123450".to_vec(); // 128bit/16
+
+    let mut encrypt = FileEncryptionProperties::builder(footer_key.clone());
+    let mut decrypt = FileDecryptionProperties::builder(footer_key.clone());
+
+    for field in schema.fields().iter() {
+        encrypt = encrypt.with_column_key(field.name().as_str(), 
column_key.clone());
+        decrypt = decrypt.with_column_key(field.name().as_str(), 
column_key.clone());
+    }
+
+    let encrypt = encrypt.build()?;
+    let decrypt = decrypt.build()?;
+    Ok((encrypt, decrypt))
+}
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index f4089c6a8c..b356f249b7 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -58,6 +58,7 @@ base64 = "0.22.1"
 chrono = { workspace = true }
 half = { workspace = true }
 hashbrown = { workspace = true }
+hex = "0.4.3"
 indexmap = { workspace = true }
 libc = "0.2.174"
 log = { workspace = true }
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index d612c9be15..6618c6aeec 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -29,6 +29,13 @@ use std::error::Error;
 use std::fmt::{self, Display};
 use std::str::FromStr;
 
+#[cfg(feature = "parquet")]
+use hex;
+#[cfg(feature = "parquet")]
+use parquet::encryption::decrypt::FileDecryptionProperties;
+#[cfg(feature = "parquet")]
+use parquet::encryption::encrypt::FileEncryptionProperties;
+
 /// A macro that wraps a configuration struct and automatically derives
 /// [`Default`] and [`ConfigField`] for it, allowing it to be used
 /// in the [`ConfigOptions`] configuration tree.
@@ -189,7 +196,6 @@ macro_rules! config_namespace {
         }
     }
 }
-
 config_namespace! {
     /// Options related to catalog and directory scanning
     ///
@@ -661,6 +667,17 @@ config_namespace! {
     }
 }
 
+config_namespace! {
+    /// Options for configuring Parquet Modular Encryption
+    pub struct ParquetEncryptionOptions {
+        /// Optional file decryption properties
+        pub file_decryption: Option<ConfigFileDecryptionProperties>, default = 
None
+
+        /// Optional file encryption properties
+        pub file_encryption: Option<ConfigFileEncryptionProperties>, default = 
None
+    }
+}
+
 config_namespace! {
     /// Options related to query optimization
     ///
@@ -1819,6 +1836,24 @@ pub struct TableParquetOptions {
     /// )
     /// ```
     pub key_value_metadata: HashMap<String, Option<String>>,
+    /// Options for configuring Parquet modular encryption
+    /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties 
in datafusion/common/src/config.rs
+    /// These can be set via 'format.crypto', for example:
+    /// ```sql
+    /// OPTIONS (
+    ///    'format.crypto.file_encryption.encrypt_footer' 'true',
+    ///    'format.crypto.file_encryption.footer_key_as_hex' 
'30313233343536373839303132333435',  -- b"0123456789012345" */
+    ///    'format.crypto.file_encryption.column_key_as_hex::double_field' 
'31323334353637383930313233343530', -- b"1234567890123450"
+    ///    'format.crypto.file_encryption.column_key_as_hex::float_field' 
'31323334353637383930313233343531', -- b"1234567890123451"
+    ///     -- Same for decryption
+    ///    'format.crypto.file_decryption.footer_key_as_hex' 
'30313233343536373839303132333435', -- b"0123456789012345"
+    ///    'format.crypto.file_decryption.column_key_as_hex::double_field' 
'31323334353637383930313233343530', -- b"1234567890123450"
+    ///    'format.crypto.file_decryption.column_key_as_hex::float_field' 
'31323334353637383930313233343531', -- b"1234567890123451"
+    /// )
+    /// ```
+    /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete 
example.
+    /// Note that keys must be provided as in hex format since these are 
binary strings.
+    pub crypto: ParquetEncryptionOptions,
 }
 
 impl TableParquetOptions {
@@ -1846,7 +1881,9 @@ impl ConfigField for TableParquetOptions {
     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: 
&'static str) {
         self.global.visit(v, key_prefix, description);
         self.column_specific_options
-            .visit(v, key_prefix, description)
+            .visit(v, key_prefix, description);
+        self.crypto
+            .visit(v, &format!("{key_prefix}.crypto"), description);
     }
 
     fn set(&mut self, key: &str, value: &str) -> Result<()> {
@@ -1867,6 +1904,8 @@ impl ConfigField for TableParquetOptions {
             };
             self.key_value_metadata.insert(k, Some(value.into()));
             Ok(())
+        } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
+            self.crypto.set(crypto_feature, value)
         } else if key.contains("::") {
             self.column_specific_options.set(key, value)
         } else {
@@ -2017,6 +2056,322 @@ config_namespace_with_hashmap! {
     }
 }
 
+#[derive(Clone, Debug, PartialEq)]
+pub struct ConfigFileEncryptionProperties {
+    /// Should the parquet footer be encrypted
+    /// default is true
+    pub encrypt_footer: bool,
+    /// Key to use for the parquet footer encoded in hex format
+    pub footer_key_as_hex: String,
+    /// Metadata information for footer key
+    pub footer_key_metadata_as_hex: String,
+    /// HashMap of column names --> (key in hex format, metadata)
+    pub column_encryption_properties: HashMap<String, 
ColumnEncryptionProperties>,
+    /// AAD prefix string uniquely identifies the file and prevents file 
swapping
+    pub aad_prefix_as_hex: String,
+    /// If true, store the AAD prefix in the file
+    /// default is false
+    pub store_aad_prefix: bool,
+}
+
+// Setup to match EncryptionPropertiesBuilder::new()
+impl Default for ConfigFileEncryptionProperties {
+    fn default() -> Self {
+        ConfigFileEncryptionProperties {
+            encrypt_footer: true,
+            footer_key_as_hex: String::new(),
+            footer_key_metadata_as_hex: String::new(),
+            column_encryption_properties: Default::default(),
+            aad_prefix_as_hex: String::new(),
+            store_aad_prefix: false,
+        }
+    }
+}
+
+config_namespace_with_hashmap! {
+    pub struct ColumnEncryptionProperties {
+        /// Per column encryption key
+        pub column_key_as_hex: String, default = "".to_string()
+        /// Per column encryption key metadata
+        pub column_metadata_as_hex: Option<String>, default = None
+    }
+}
+
+impl ConfigField for ConfigFileEncryptionProperties {
+    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: 
&'static str) {
+        let key = format!("{key_prefix}.encrypt_footer");
+        let desc = "Encrypt the footer";
+        self.encrypt_footer.visit(v, key.as_str(), desc);
+
+        let key = format!("{key_prefix}.footer_key_as_hex");
+        let desc = "Key to use for the parquet footer";
+        self.footer_key_as_hex.visit(v, key.as_str(), desc);
+
+        let key = format!("{key_prefix}.footer_key_metadata_as_hex");
+        let desc = "Metadata to use for the parquet footer";
+        self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
+
+        let key = format!("{key_prefix}.aad_prefix_as_hex");
+        let desc = "AAD prefix to use";
+        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
+
+        let key = format!("{key_prefix}.store_aad_prefix");
+        let desc = "If true, store the AAD prefix";
+        self.store_aad_prefix.visit(v, key.as_str(), desc);
+
+        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
+    }
+
+    fn set(&mut self, key: &str, value: &str) -> Result<()> {
+        // Any hex encoded values must be pre-encoded using
+        // hex::encode() before calling set.
+
+        if key.contains("::") {
+            // Handle any column specific properties
+            return self.column_encryption_properties.set(key, value);
+        };
+
+        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
+        match key {
+            "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
+            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, 
value.as_ref()),
+            "footer_key_metadata_as_hex" => {
+                self.footer_key_metadata_as_hex.set(rem, value.as_ref())
+            }
+            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, 
value.as_ref()),
+            "store_aad_prefix" => self.store_aad_prefix.set(rem, 
value.as_ref()),
+            _ => _config_err!(
+                "Config value \"{}\" not found on 
ConfigFileEncryptionProperties",
+                key
+            ),
+        }
+    }
+}
+
+#[cfg(feature = "parquet")]
+impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
+    fn from(val: ConfigFileEncryptionProperties) -> Self {
+        let mut fep = FileEncryptionProperties::builder(
+            hex::decode(val.footer_key_as_hex).unwrap(),
+        )
+        .with_plaintext_footer(!val.encrypt_footer)
+        .with_aad_prefix_storage(val.store_aad_prefix);
+
+        if !val.footer_key_metadata_as_hex.is_empty() {
+            fep = fep.with_footer_key_metadata(
+                hex::decode(&val.footer_key_metadata_as_hex)
+                    .expect("Invalid footer key metadata"),
+            );
+        }
+
+        for (column_name, encryption_props) in 
val.column_encryption_properties.iter() {
+            let encryption_key = 
hex::decode(&encryption_props.column_key_as_hex)
+                .expect("Invalid column encryption key");
+            let key_metadata = encryption_props
+                .column_metadata_as_hex
+                .as_ref()
+                .map(|x| hex::decode(x).expect("Invalid column metadata"));
+            match key_metadata {
+                Some(key_metadata) => {
+                    fep = fep.with_column_key_and_metadata(
+                        column_name,
+                        encryption_key,
+                        key_metadata,
+                    );
+                }
+                None => {
+                    fep = fep.with_column_key(column_name, encryption_key);
+                }
+            }
+        }
+
+        if !val.aad_prefix_as_hex.is_empty() {
+            let aad_prefix: Vec<u8> =
+                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD 
prefix");
+            fep = fep.with_aad_prefix(aad_prefix);
+        }
+        fep.build().unwrap()
+    }
+}
+
+#[cfg(feature = "parquet")]
+impl From<&FileEncryptionProperties> for ConfigFileEncryptionProperties {
+    fn from(f: &FileEncryptionProperties) -> Self {
+        let (column_names_vec, column_keys_vec, column_metas_vec) = 
f.column_keys();
+
+        let mut column_encryption_properties: HashMap<
+            String,
+            ColumnEncryptionProperties,
+        > = HashMap::new();
+
+        for (i, column_name) in column_names_vec.iter().enumerate() {
+            let column_key_as_hex = hex::encode(&column_keys_vec[i]);
+            let column_metadata_as_hex: Option<String> =
+                column_metas_vec.get(i).map(hex::encode);
+            column_encryption_properties.insert(
+                column_name.clone(),
+                ColumnEncryptionProperties {
+                    column_key_as_hex,
+                    column_metadata_as_hex,
+                },
+            );
+        }
+        let mut aad_prefix: Vec<u8> = Vec::new();
+        if let Some(prefix) = f.aad_prefix() {
+            aad_prefix = prefix.clone();
+        }
+        ConfigFileEncryptionProperties {
+            encrypt_footer: f.encrypt_footer(),
+            footer_key_as_hex: hex::encode(f.footer_key()),
+            footer_key_metadata_as_hex: f
+                .footer_key_metadata()
+                .map(hex::encode)
+                .unwrap_or_default(),
+            column_encryption_properties,
+            aad_prefix_as_hex: hex::encode(aad_prefix),
+            store_aad_prefix: f.store_aad_prefix(),
+        }
+    }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct ConfigFileDecryptionProperties {
+    /// Binary string to use for the parquet footer encoded in hex format
+    pub footer_key_as_hex: String,
+    /// HashMap of column names --> key in hex format
+    pub column_decryption_properties: HashMap<String, 
ColumnDecryptionProperties>,
+    /// AAD prefix string uniquely identifies the file and prevents file 
swapping
+    pub aad_prefix_as_hex: String,
+    /// If true, then verify signature for files with plaintext footers.
+    /// default = true
+    pub footer_signature_verification: bool,
+}
+
+config_namespace_with_hashmap! {
+    pub struct ColumnDecryptionProperties {
+        /// Per column encryption key
+        pub column_key_as_hex: String, default = "".to_string()
+    }
+}
+
+// Setup to match DecryptionPropertiesBuilder::new()
+impl Default for ConfigFileDecryptionProperties {
+    fn default() -> Self {
+        ConfigFileDecryptionProperties {
+            footer_key_as_hex: String::new(),
+            column_decryption_properties: Default::default(),
+            aad_prefix_as_hex: String::new(),
+            footer_signature_verification: true,
+        }
+    }
+}
+
+impl ConfigField for ConfigFileDecryptionProperties {
+    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: 
&'static str) {
+        let key = format!("{key_prefix}.footer_key_as_hex");
+        let desc = "Key to use for the parquet footer";
+        self.footer_key_as_hex.visit(v, key.as_str(), desc);
+
+        let key = format!("{key_prefix}.aad_prefix_as_hex");
+        let desc = "AAD prefix to use";
+        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
+
+        let key = format!("{key_prefix}.footer_signature_verification");
+        let desc = "If true, verify the footer signature";
+        self.footer_signature_verification
+            .visit(v, key.as_str(), desc);
+
+        self.column_decryption_properties.visit(v, key_prefix, desc);
+    }
+
+    fn set(&mut self, key: &str, value: &str) -> Result<()> {
+        // Any hex encoded values must be pre-encoded using
+        // hex::encode() before calling set.
+
+        if key.contains("::") {
+            // Handle any column specific properties
+            return self.column_decryption_properties.set(key, value);
+        };
+
+        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
+        match key {
+            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, 
value.as_ref()),
+            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, 
value.as_ref()),
+            "footer_signature_verification" => {
+                self.footer_signature_verification.set(rem, value.as_ref())
+            }
+            _ => _config_err!(
+                "Config value \"{}\" not found on 
ConfigFileEncryptionProperties",
+                key
+            ),
+        }
+    }
+}
+
+#[cfg(feature = "parquet")]
+impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
+    fn from(val: ConfigFileDecryptionProperties) -> Self {
+        let mut column_names: Vec<&str> = Vec::new();
+        let mut column_keys: Vec<Vec<u8>> = Vec::new();
+
+        for (col_name, decryption_properties) in 
val.column_decryption_properties.iter() {
+            column_names.push(col_name.as_str());
+            column_keys.push(
+                hex::decode(&decryption_properties.column_key_as_hex)
+                    .expect("Invalid column decryption key"),
+            );
+        }
+
+        let mut fep = FileDecryptionProperties::builder(
+            hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
+        )
+        .with_column_keys(column_names, column_keys)
+        .unwrap();
+
+        if !val.footer_signature_verification {
+            fep = fep.disable_footer_signature_verification();
+        }
+
+        if !val.aad_prefix_as_hex.is_empty() {
+            let aad_prefix =
+                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD 
prefix");
+            fep = fep.with_aad_prefix(aad_prefix);
+        }
+
+        fep.build().unwrap()
+    }
+}
+
+#[cfg(feature = "parquet")]
+impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties {
+    fn from(f: &FileDecryptionProperties) -> Self {
+        let (column_names_vec, column_keys_vec) = f.column_keys();
+        let mut column_decryption_properties: HashMap<
+            String,
+            ColumnDecryptionProperties,
+        > = HashMap::new();
+        for (i, column_name) in column_names_vec.iter().enumerate() {
+            let props = ColumnDecryptionProperties {
+                column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
+            };
+            column_decryption_properties.insert(column_name.clone(), props);
+        }
+
+        let mut aad_prefix: Vec<u8> = Vec::new();
+        if let Some(prefix) = f.aad_prefix() {
+            aad_prefix = prefix.clone();
+        }
+        ConfigFileDecryptionProperties {
+            footer_key_as_hex: hex::encode(
+                f.footer_key(None).unwrap_or_default().as_ref(),
+            ),
+            column_decryption_properties,
+            aad_prefix_as_hex: hex::encode(aad_prefix),
+            footer_signature_verification: 
f.check_plaintext_footer_integrity(),
+        }
+    }
+}
+
 config_namespace! {
     /// Options controlling CSV format
     pub struct CsvOptions {
@@ -2199,13 +2554,12 @@ impl Display for OutputFormat {
 
 #[cfg(test)]
 mod tests {
-    use std::any::Any;
-    use std::collections::HashMap;
-
     use crate::config::{
         ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, 
ExtensionOptions,
         Extensions, TableOptions,
     };
+    use std::any::Any;
+    use std::collections::HashMap;
 
     #[derive(Default, Debug, Clone)]
     pub struct TestExtensionConfig {
@@ -2334,6 +2688,129 @@ mod tests {
         );
     }
 
+    #[cfg(feature = "parquet")]
+    #[test]
+    fn parquet_table_encryption() {
+        use crate::config::{
+            ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
+        };
+        use parquet::encryption::decrypt::FileDecryptionProperties;
+        use parquet::encryption::encrypt::FileEncryptionProperties;
+
+        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+        let column_names = vec!["double_field", "float_field"];
+        let column_keys =
+            vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
+
+        let file_encryption_properties =
+            FileEncryptionProperties::builder(footer_key.clone())
+                .with_column_keys(column_names.clone(), column_keys.clone())
+                .unwrap()
+                .build()
+                .unwrap();
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+            .with_column_keys(column_names.clone(), column_keys.clone())
+            .unwrap()
+            .build()
+            .unwrap();
+
+        // Test round-trip
+        let config_encrypt: ConfigFileEncryptionProperties =
+            (&file_encryption_properties).into();
+        let encryption_properties_built: FileEncryptionProperties =
+            config_encrypt.clone().into();
+        assert_eq!(file_encryption_properties, encryption_properties_built);
+
+        let config_decrypt: ConfigFileDecryptionProperties =
+            (&decryption_properties).into();
+        let decryption_properties_built: FileDecryptionProperties =
+            config_decrypt.clone().into();
+        assert_eq!(decryption_properties, decryption_properties_built);
+
+        
///////////////////////////////////////////////////////////////////////////////////
+        // Test encryption config
+
+        // Display original encryption config
+        // println!("{:#?}", config_encrypt);
+
+        let mut table_config = TableOptions::new();
+        table_config.set_config_format(ConfigFileType::PARQUET);
+        table_config
+            .parquet
+            .set(
+                "crypto.file_encryption.encrypt_footer",
+                config_encrypt.encrypt_footer.to_string().as_str(),
+            )
+            .unwrap();
+        table_config
+            .parquet
+            .set(
+                "crypto.file_encryption.footer_key_as_hex",
+                config_encrypt.footer_key_as_hex.as_str(),
+            )
+            .unwrap();
+
+        for (i, col_name) in column_names.iter().enumerate() {
+            let key = 
format!("crypto.file_encryption.column_key_as_hex::{col_name}");
+            let value = hex::encode(column_keys[i].clone());
+            table_config
+                .parquet
+                .set(key.as_str(), value.as_str())
+                .unwrap();
+        }
+
+        // Print matching final encryption config
+        // println!("{:#?}", table_config.parquet.crypto.file_encryption);
+
+        assert_eq!(
+            table_config.parquet.crypto.file_encryption,
+            Some(config_encrypt)
+        );
+
+        
///////////////////////////////////////////////////////////////////////////////////
+        // Test decryption config
+
+        // Display original decryption config
+        // println!("{:#?}", config_decrypt);
+
+        let mut table_config = TableOptions::new();
+        table_config.set_config_format(ConfigFileType::PARQUET);
+        table_config
+            .parquet
+            .set(
+                "crypto.file_decryption.footer_key_as_hex",
+                config_decrypt.footer_key_as_hex.as_str(),
+            )
+            .unwrap();
+
+        for (i, col_name) in column_names.iter().enumerate() {
+            let key = 
format!("crypto.file_decryption.column_key_as_hex::{col_name}");
+            let value = hex::encode(column_keys[i].clone());
+            table_config
+                .parquet
+                .set(key.as_str(), value.as_str())
+                .unwrap();
+        }
+
+        // Print matching final decryption config
+        // println!("{:#?}", table_config.parquet.crypto.file_decryption);
+
+        assert_eq!(
+            table_config.parquet.crypto.file_decryption,
+            Some(config_decrypt.clone())
+        );
+
+        // Set config directly
+        let mut table_config = TableOptions::new();
+        table_config.set_config_format(ConfigFileType::PARQUET);
+        table_config.parquet.crypto.file_decryption = 
Some(config_decrypt.clone());
+        assert_eq!(
+            table_config.parquet.crypto.file_decryption,
+            Some(config_decrypt.clone())
+        );
+    }
+
     #[cfg(feature = "parquet")]
     #[test]
     fn parquet_table_options_config_entry() {
diff --git a/datafusion/common/src/file_options/parquet_writer.rs 
b/datafusion/common/src/file_options/parquet_writer.rs
index 07e763f0ee..60f0f4abb0 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -95,10 +95,17 @@ impl TryFrom<&TableParquetOptions> for 
WriterPropertiesBuilder {
             global,
             column_specific_options,
             key_value_metadata,
+            crypto,
         } = table_parquet_options;
 
         let mut builder = global.into_writer_properties_builder()?;
 
+        if let Some(file_encryption_properties) = &crypto.file_encryption {
+            builder = builder.with_file_encryption_properties(
+                file_encryption_properties.clone().into(),
+            );
+        }
+
         // check that the arrow schema is present in the kv_metadata, if 
configured to do so
         if !global.skip_arrow_metadata
             && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
@@ -449,7 +456,10 @@ mod tests {
     };
     use std::collections::HashMap;
 
-    use crate::config::{ParquetColumnOptions, ParquetOptions};
+    use crate::config::{
+        ConfigFileEncryptionProperties, ParquetColumnOptions, 
ParquetEncryptionOptions,
+        ParquetOptions,
+    };
 
     use super::*;
 
@@ -580,6 +590,9 @@ mod tests {
             HashMap::from([(COL_NAME.into(), configured_col_props)])
         };
 
+        let fep: Option<ConfigFileEncryptionProperties> =
+            props.file_encryption_properties().map(|fe| fe.into());
+
         #[allow(deprecated)] // max_statistics_size
         TableParquetOptions {
             global: ParquetOptions {
@@ -627,6 +640,10 @@ mod tests {
             },
             column_specific_options,
             key_value_metadata,
+            crypto: ParquetEncryptionOptions {
+                file_encryption: fep,
+                file_decryption: None,
+            },
         }
     }
 
@@ -681,6 +698,7 @@ mod tests {
             )]
             .into(),
             key_value_metadata: [(key, value)].into(),
+            crypto: Default::default(),
         };
 
         let writer_props = 
WriterPropertiesBuilder::try_from(&table_parquet_opts)
diff --git a/datafusion/core/src/dataframe/parquet.rs 
b/datafusion/core/src/dataframe/parquet.rs
index 1bb5444ca0..a2bec74ee1 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -246,4 +246,72 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn roundtrip_parquet_with_encryption() -> Result<()> {
+        use parquet::encryption::decrypt::FileDecryptionProperties;
+        use parquet::encryption::encrypt::FileEncryptionProperties;
+
+        let test_df = test_util::test_table().await?;
+
+        let schema = test_df.schema();
+        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+        let column_key = b"1234567890123450".to_vec(); // 128bit/16
+
+        let mut encrypt = 
FileEncryptionProperties::builder(footer_key.clone());
+        let mut decrypt = 
FileDecryptionProperties::builder(footer_key.clone());
+
+        for field in schema.fields().iter() {
+            encrypt = encrypt.with_column_key(field.name().as_str(), 
column_key.clone());
+            decrypt = decrypt.with_column_key(field.name().as_str(), 
column_key.clone());
+        }
+
+        let encrypt = encrypt.build()?;
+        let decrypt = decrypt.build()?;
+
+        let df = test_df.clone();
+        let tmp_dir = TempDir::new()?;
+        let tempfile = tmp_dir.path().join("roundtrip.parquet");
+        let tempfile_str = tempfile.into_os_string().into_string().unwrap();
+
+        // Write encrypted parquet using write_parquet
+        let mut options = TableParquetOptions::default();
+        options.crypto.file_encryption = Some((&encrypt).into());
+
+        df.write_parquet(
+            tempfile_str.as_str(),
+            DataFrameWriteOptions::new().with_single_file_output(true),
+            Some(options),
+        )
+        .await?;
+        let num_rows_written = test_df.count().await?;
+
+        // Read encrypted parquet
+        let ctx: SessionContext = SessionContext::new();
+        let read_options =
+            
ParquetReadOptions::default().file_decryption_properties((&decrypt).into());
+
+        ctx.register_parquet("roundtrip_parquet", &tempfile_str, 
read_options.clone())
+            .await?;
+
+        let df_enc = ctx.sql("SELECT * FROM roundtrip_parquet").await?;
+        let num_rows_read = df_enc.count().await?;
+
+        assert_eq!(num_rows_read, num_rows_written);
+
+        // Read encrypted parquet and subset rows + columns
+        let encrypted_parquet_df = ctx.read_parquet(tempfile_str, 
read_options).await?;
+
+        // Select three columns and filter the results
+        // Test that the filter works as expected
+        let selected = encrypted_parquet_df
+            .clone()
+            .select_columns(&["c1", "c2", "c3"])?
+            .filter(col("c2").gt(lit(4)))?;
+
+        let num_rows_selected = selected.count().await?;
+        assert_eq!(num_rows_selected, 14);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index 9aaf1cf598..02b792823a 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -34,7 +34,7 @@ use crate::error::Result;
 use crate::execution::context::{SessionConfig, SessionState};
 
 use arrow::datatypes::{DataType, Schema, SchemaRef};
-use datafusion_common::config::TableOptions;
+use datafusion_common::config::{ConfigFileDecryptionProperties, TableOptions};
 use datafusion_common::{
     DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
     DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
@@ -252,6 +252,8 @@ pub struct ParquetReadOptions<'a> {
     pub schema: Option<&'a Schema>,
     /// Indicates how the file is sorted
     pub file_sort_order: Vec<Vec<SortExpr>>,
+    /// Properties for decryption of Parquet files that use modular encryption
+    pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
 }
 
 impl Default for ParquetReadOptions<'_> {
@@ -263,6 +265,7 @@ impl Default for ParquetReadOptions<'_> {
             skip_metadata: None,
             schema: None,
             file_sort_order: vec![],
+            file_decryption_properties: None,
         }
     }
 }
@@ -313,6 +316,15 @@ impl<'a> ParquetReadOptions<'a> {
         self.file_sort_order = file_sort_order;
         self
     }
+
+    /// Configure file decryption properties for reading encrypted Parquet 
files
+    pub fn file_decryption_properties(
+        mut self,
+        file_decryption_properties: ConfigFileDecryptionProperties,
+    ) -> Self {
+        self.file_decryption_properties = Some(file_decryption_properties);
+        self
+    }
 }
 
 /// Options that control the reading of ARROW files.
@@ -574,7 +586,11 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
         config: &SessionConfig,
         table_options: TableOptions,
     ) -> ListingOptions {
-        let mut file_format = 
ParquetFormat::new().with_options(table_options.parquet);
+        let mut options = table_options.parquet;
+        if let Some(file_decryption_properties) = 
&self.file_decryption_properties {
+            options.crypto.file_decryption = 
Some(file_decryption_properties.clone());
+        }
+        let mut file_format = ParquetFormat::new().with_options(options);
 
         if let Some(parquet_pruning) = self.parquet_pruning {
             file_format = file_format.with_enable_pruning(parquet_pruning)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index dcd20f8a26..8a2db3431f 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -196,7 +196,8 @@ mod tests {
         let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
 
         let stats =
-            fetch_statistics(store.as_ref(), schema.clone(), &meta[0], 
None).await?;
+            fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None, 
None)
+                .await?;
 
         assert_eq!(stats.num_rows, Precision::Exact(3));
         let c1_stats = &stats.column_statistics[0];
@@ -204,7 +205,8 @@ mod tests {
         assert_eq!(c1_stats.null_count, Precision::Exact(1));
         assert_eq!(c2_stats.null_count, Precision::Exact(3));
 
-        let stats = fetch_statistics(store.as_ref(), schema, &meta[1], 
None).await?;
+        let stats =
+            fetch_statistics(store.as_ref(), schema, &meta[1], None, 
None).await?;
         assert_eq!(stats.num_rows, Precision::Exact(3));
         let c1_stats = &stats.column_statistics[0];
         let c2_stats = &stats.column_statistics[1];
@@ -376,9 +378,14 @@ mod tests {
 
         // Use a size hint larger than the parquet footer but smaller than the 
actual metadata, requiring a second fetch
         // for the remaining metadata
-        fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], 
Some(9))
-            .await
-            .expect("error reading metadata with hint");
+        fetch_parquet_metadata(
+            store.as_ref() as &dyn ObjectStore,
+            &meta[0],
+            Some(9),
+            None,
+        )
+        .await
+        .expect("error reading metadata with hint");
 
         assert_eq!(store.request_count(), 2);
 
@@ -396,9 +403,14 @@ mod tests {
             .await
             .unwrap();
 
-        let stats =
-            fetch_statistics(store.upcast().as_ref(), schema.clone(), 
&meta[0], Some(9))
-                .await?;
+        let stats = fetch_statistics(
+            store.upcast().as_ref(),
+            schema.clone(),
+            &meta[0],
+            Some(9),
+            None,
+        )
+        .await?;
 
         assert_eq!(stats.num_rows, Precision::Exact(3));
         let c1_stats = &stats.column_statistics[0];
@@ -413,7 +425,7 @@ mod tests {
         // Use the file size as the hint so we can get the full metadata from 
the first fetch
         let size_hint = meta[0].size as usize;
 
-        fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], 
Some(size_hint))
+        fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], 
Some(size_hint), None)
             .await
             .expect("error reading metadata with hint");
 
@@ -432,6 +444,7 @@ mod tests {
             schema.clone(),
             &meta[0],
             Some(size_hint),
+            None,
         )
         .await?;
 
@@ -448,7 +461,7 @@ mod tests {
         // Use the a size hint larger than the file size to make sure we don't 
panic
         let size_hint = (meta[0].size + 100) as usize;
 
-        fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], 
Some(size_hint))
+        fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], 
Some(size_hint), None)
             .await
             .expect("error reading metadata with hint");
 
@@ -487,7 +500,8 @@ mod tests {
         let schema = format.infer_schema(&state, &store, 
&files).await.unwrap();
 
         // Fetch statistics for first file
-        let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], 
None).await?;
+        let pq_meta =
+            fetch_parquet_metadata(store.as_ref(), &files[0], None, 
None).await?;
         let stats = statistics_from_parquet_meta_calc(&pq_meta, 
schema.clone())?;
         assert_eq!(stats.num_rows, Precision::Exact(4));
 
@@ -545,7 +559,8 @@ mod tests {
         };
 
         // Fetch statistics for first file
-        let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], 
None).await?;
+        let pq_meta =
+            fetch_parquet_metadata(store.as_ref(), &files[0], None, 
None).await?;
         let stats = statistics_from_parquet_meta_calc(&pq_meta, 
schema.clone())?;
         assert_eq!(stats.num_rows, Precision::Exact(3));
         // column c1
@@ -571,7 +586,8 @@ mod tests {
         assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));
 
         // Fetch statistics for second file
-        let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], 
None).await?;
+        let pq_meta =
+            fetch_parquet_metadata(store.as_ref(), &files[1], None, 
None).await?;
         let stats = statistics_from_parquet_meta_calc(&pq_meta, 
schema.clone())?;
         assert_eq!(stats.num_rows, Precision::Exact(3));
         // column c1: missing from the file so the table treats all 3 rows as 
null
diff --git a/datafusion/core/tests/parquet/custom_reader.rs 
b/datafusion/core/tests/parquet/custom_reader.rs
index 761a78a29f..5fc3513ff7 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -241,6 +241,7 @@ impl AsyncFileReader for ParquetFileReader {
                 self.store.as_ref(),
                 &self.meta,
                 self.metadata_size_hint,
+                None,
             )
             .await
             .map_err(|e| {
diff --git a/datafusion/core/tests/parquet/encryption.rs 
b/datafusion/core/tests/parquet/encryption.rs
new file mode 100644
index 0000000000..203c985428
--- /dev/null
+++ b/datafusion/core/tests/parquet/encryption.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! non trivial integration testing for parquet predicate pushdown
+//!
+//! Testing hints: If you run this test with --nocapture it will tell you where
+//! the generated parquet file went. You can then test it and try out various 
queries
+//! datafusion-cli like:
+//!
+//! ```sql
+//! create external table data stored as parquet location 'data.parquet';
+//! select * from data limit 10;
+//! ```
+
+use arrow::record_batch::RecordBatch;
+use datafusion::prelude::{ParquetReadOptions, SessionContext};
+use std::fs::File;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+
+use parquet::arrow::ArrowWriter;
+use parquet::encryption::decrypt::FileDecryptionProperties;
+use parquet::encryption::encrypt::FileEncryptionProperties;
+use parquet::file::properties::WriterProperties;
+use tempfile::TempDir;
+
+async fn read_parquet_test_data<'a, T: Into<String>>(
+    path: T,
+    ctx: &SessionContext,
+    options: ParquetReadOptions<'a>,
+) -> Vec<RecordBatch> {
+    ctx.read_parquet(path.into(), options)
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap()
+}
+
+pub fn write_batches(
+    path: PathBuf,
+    props: WriterProperties,
+    batches: impl IntoIterator<Item = RecordBatch>,
+) -> datafusion_common::Result<usize> {
+    let mut batches = batches.into_iter();
+    let first_batch = batches.next().expect("need at least one record batch");
+    let schema = first_batch.schema();
+
+    let file = File::create(&path)?;
+    let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), 
Some(props))?;
+
+    writer.write(&first_batch)?;
+    let mut num_rows = first_batch.num_rows();
+
+    for batch in batches {
+        writer.write(&batch)?;
+        num_rows += batch.num_rows();
+    }
+    writer.close()?;
+    Ok(num_rows)
+}
+
+#[tokio::test]
+async fn round_trip_encryption() {
+    let ctx: SessionContext = SessionContext::new();
+
+    let options = ParquetReadOptions::default();
+    let batches = read_parquet_test_data(
+        "tests/data/filter_pushdown/single_file.gz.parquet",
+        &ctx,
+        options,
+    )
+    .await;
+
+    let schema = batches[0].schema();
+    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+    let column_key = b"1234567890123450".to_vec(); // 128bit/16
+
+    let mut encrypt = FileEncryptionProperties::builder(footer_key.clone());
+    let mut decrypt = FileDecryptionProperties::builder(footer_key.clone());
+
+    for field in schema.fields.iter() {
+        encrypt = encrypt.with_column_key(field.name().as_str(), 
column_key.clone());
+        decrypt = decrypt.with_column_key(field.name().as_str(), 
column_key.clone());
+    }
+    let encrypt = encrypt.build().unwrap();
+    let decrypt = decrypt.build().unwrap();
+
+    // Write encrypted parquet
+    let props = WriterProperties::builder()
+        .with_file_encryption_properties(encrypt)
+        .build();
+
+    let tempdir = TempDir::new_in(Path::new(".")).unwrap();
+    let tempfile = tempdir.path().join("data.parquet");
+    let num_rows_written = write_batches(tempfile.clone(), props, 
batches).unwrap();
+
+    // Read encrypted parquet
+    let ctx: SessionContext = SessionContext::new();
+    let options =
+        
ParquetReadOptions::default().file_decryption_properties((&decrypt).into());
+
+    let encrypted_batches = read_parquet_test_data(
+        tempfile.into_os_string().into_string().unwrap(),
+        &ctx,
+        options,
+    )
+    .await;
+
+    let num_rows_read = encrypted_batches
+        .iter()
+        .fold(0, |acc, x| acc + x.num_rows());
+
+    assert_eq!(num_rows_written, num_rows_read);
+}
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index 098dc366ea..94d6d152a3 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -43,6 +43,7 @@ use std::sync::Arc;
 use tempfile::NamedTempFile;
 
 mod custom_reader;
+mod encryption;
 mod external_access_plan;
 mod file_statistics;
 mod filter_pushdown;
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index 647fbc8d05..59663fe510 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -78,6 +78,7 @@ use parquet::arrow::arrow_writer::{
 use parquet::arrow::async_reader::MetadataFetch;
 use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, 
AsyncArrowWriter};
 use parquet::basic::Type;
+use parquet::encryption::decrypt::FileDecryptionProperties;
 use parquet::errors::ParquetError;
 use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, 
RowGroupMetaData};
 use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
@@ -303,10 +304,18 @@ async fn fetch_schema_with_location(
     store: &dyn ObjectStore,
     file: &ObjectMeta,
     metadata_size_hint: Option<usize>,
+    file_decryption_properties: Option<&FileDecryptionProperties>,
     coerce_int96: Option<TimeUnit>,
 ) -> Result<(Path, Schema)> {
     let loc_path = file.location.clone();
-    let schema = fetch_schema(store, file, metadata_size_hint, 
coerce_int96).await?;
+    let schema = fetch_schema(
+        store,
+        file,
+        metadata_size_hint,
+        file_decryption_properties,
+        coerce_int96,
+    )
+    .await?;
     Ok((loc_path, schema))
 }
 
@@ -341,12 +350,22 @@ impl FileFormat for ParquetFormat {
             Some(time_unit) => 
Some(parse_coerce_int96_string(time_unit.as_str())?),
             None => None,
         };
+        let config_file_decryption_properties = 
&self.options.crypto.file_decryption;
+        let file_decryption_properties: Option<FileDecryptionProperties> =
+            match config_file_decryption_properties {
+                Some(cfd) => {
+                    let fd: FileDecryptionProperties = cfd.clone().into();
+                    Some(fd)
+                }
+                None => None,
+            };
         let mut schemas: Vec<_> = futures::stream::iter(objects)
             .map(|object| {
                 fetch_schema_with_location(
                     store.as_ref(),
                     object,
                     self.metadata_size_hint(),
+                    file_decryption_properties.as_ref(),
                     coerce_int96,
                 )
             })
@@ -396,11 +415,21 @@ impl FileFormat for ParquetFormat {
         table_schema: SchemaRef,
         object: &ObjectMeta,
     ) -> Result<Statistics> {
+        let config_file_decryption_properties = 
&self.options.crypto.file_decryption;
+        let file_decryption_properties: Option<FileDecryptionProperties> =
+            match config_file_decryption_properties {
+                Some(cfd) => {
+                    let fd: FileDecryptionProperties = cfd.clone().into();
+                    Some(fd)
+                }
+                None => None,
+            };
         let stats = fetch_statistics(
             store.as_ref(),
             table_schema,
             object,
             self.metadata_size_hint(),
+            file_decryption_properties.as_ref(),
         )
         .await?;
         Ok(stats)
@@ -930,12 +959,14 @@ pub async fn fetch_parquet_metadata(
     store: &dyn ObjectStore,
     meta: &ObjectMeta,
     size_hint: Option<usize>,
+    decryption_properties: Option<&FileDecryptionProperties>,
 ) -> Result<ParquetMetaData> {
     let file_size = meta.size;
     let fetch = ObjectStoreFetch::new(store, meta);
 
     ParquetMetaDataReader::new()
         .with_prefetch_hint(size_hint)
+        .with_decryption_properties(decryption_properties)
         .load_and_finish(fetch, file_size)
         .await
         .map_err(DataFusionError::from)
@@ -946,9 +977,16 @@ async fn fetch_schema(
     store: &dyn ObjectStore,
     file: &ObjectMeta,
     metadata_size_hint: Option<usize>,
+    file_decryption_properties: Option<&FileDecryptionProperties>,
     coerce_int96: Option<TimeUnit>,
 ) -> Result<Schema> {
-    let metadata = fetch_parquet_metadata(store, file, 
metadata_size_hint).await?;
+    let metadata = fetch_parquet_metadata(
+        store,
+        file,
+        metadata_size_hint,
+        file_decryption_properties,
+    )
+    .await?;
     let file_metadata = metadata.file_metadata();
     let schema = parquet_to_arrow_schema(
         file_metadata.schema_descr(),
@@ -970,8 +1008,11 @@ pub async fn fetch_statistics(
     table_schema: SchemaRef,
     file: &ObjectMeta,
     metadata_size_hint: Option<usize>,
+    decryption_properties: Option<&FileDecryptionProperties>,
 ) -> Result<Statistics> {
-    let metadata = fetch_parquet_metadata(store, file, 
metadata_size_hint).await?;
+    let metadata =
+        fetch_parquet_metadata(store, file, metadata_size_hint, 
decryption_properties)
+            .await?;
     statistics_from_parquet_meta_calc(&metadata, table_schema)
 }
 
@@ -1261,9 +1302,15 @@ impl FileSink for ParquetSink {
         object_store: Arc<dyn ObjectStore>,
     ) -> Result<u64> {
         let parquet_opts = &self.parquet_options;
-        let allow_single_file_parallelism =
+        let mut allow_single_file_parallelism =
             parquet_opts.global.allow_single_file_parallelism;
 
+        if parquet_opts.crypto.file_encryption.is_some() {
+            // For now, arrow-rs does not support parallel writes with 
encryption
+            // See https://github.com/apache/arrow-rs/issues/7359
+            allow_single_file_parallelism = false;
+        }
+
         let mut file_write_tasks: JoinSet<
             std::result::Result<(Path, FileMetaData), DataFusionError>,
         > = JoinSet::new();
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index e7e8d52070..561be82cf7 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -45,6 +45,7 @@ use log::debug;
 use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::encryption::decrypt::FileDecryptionProperties;
 use parquet::file::metadata::ParquetMetaDataReader;
 
 /// Implements [`FileOpener`] for a parquet file
@@ -88,13 +89,12 @@ pub(super) struct ParquetOpener {
     pub enable_row_group_stats_pruning: bool,
     /// Coerce INT96 timestamps to specific TimeUnit
     pub coerce_int96: Option<TimeUnit>,
+    /// Optional parquet FileDecryptionProperties
+    pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
 }
 
 impl FileOpener for ParquetOpener {
     fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> 
Result<FileOpenFuture> {
-        let predicate_creation_errors = MetricBuilder::new(&self.metrics)
-            .global_counter("num_predicate_creation_errors");
-
         let file_range = file_meta.range.clone();
         let extensions = file_meta.extensions.clone();
         let file_name = file_meta.location().to_string();
@@ -128,7 +128,17 @@ impl FileOpener for ParquetOpener {
         let enable_row_group_stats_pruning = 
self.enable_row_group_stats_pruning;
         let limit = self.limit;
 
-        let enable_page_index = self.enable_page_index;
+        let predicate_creation_errors = MetricBuilder::new(&self.metrics)
+            .global_counter("num_predicate_creation_errors");
+
+        let mut enable_page_index = self.enable_page_index;
+        let file_decryption_properties = 
self.file_decryption_properties.clone();
+
+        // For now, page index does not work with encrypted files. See:
+        // https://github.com/apache/arrow-rs/issues/7629
+        if file_decryption_properties.is_some() {
+            enable_page_index = false;
+        }
 
         Ok(Box::pin(async move {
             // Prune this file using the file level statistics and partition 
values.
@@ -171,6 +181,9 @@ impl FileOpener for ParquetOpener {
             // pruning predicates. Thus default to not requesting if from the
             // underlying reader.
             let mut options = ArrowReaderOptions::new().with_page_index(false);
+            if let Some(fd_val) = file_decryption_properties {
+                options = 
options.with_file_decryption_properties((*fd_val).clone());
+            }
             let mut metadata_timer = file_metrics.metadata_load_time.timer();
 
             // Begin by loading the metadata from the underlying reader (note
@@ -607,6 +620,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
+                file_decryption_properties: None,
             }
         };
 
@@ -691,6 +705,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
+                file_decryption_properties: None,
             }
         };
 
@@ -791,6 +806,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
+                file_decryption_properties: None,
             }
         };
         let make_meta = || FileMeta {
@@ -901,6 +917,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: false, // note that this is 
false!
                 coerce_int96: None,
+                file_decryption_properties: None,
             }
         };
 
@@ -1012,6 +1029,7 @@ mod test {
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
+                file_decryption_properties: None,
             }
         };
 
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 0ffdb7f8ac..f2e7823151 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -475,6 +475,13 @@ impl FileSource for ParquetSource {
                 Arc::new(DefaultParquetFileReaderFactory::new(object_store)) 
as _
             });
 
+        let file_decryption_properties = self
+            .table_parquet_options()
+            .crypto
+            .file_decryption
+            .as_ref()
+            .map(|props| Arc::new(props.clone().into()));
+
         let coerce_int96 = self
             .table_parquet_options
             .global
@@ -502,6 +509,7 @@ impl FileSource for ParquetSource {
             enable_row_group_stats_pruning: 
self.table_parquet_options.global.pruning,
             schema_adapter_factory,
             coerce_int96,
+            file_decryption_properties,
         })
     }
 
diff --git a/datafusion/proto-common/src/from_proto/mod.rs 
b/datafusion/proto-common/src/from_proto/mod.rs
index bd969db316..0823e15026 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -1066,6 +1066,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for 
TableParquetOptions {
                 .unwrap(),
             column_specific_options,
             key_value_metadata: Default::default(),
+            crypto: Default::default(),
         })
     }
 }
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs 
b/datafusion/proto/src/logical_plan/file_formats.rs
index d3f6511ec9..620442c79e 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -576,6 +576,7 @@ impl From<&TableParquetOptionsProto> for 
TableParquetOptions {
                 .iter()
                 .map(|(k, v)| (k.clone(), Some(v.clone())))
                 .collect(),
+            crypto: Default::default(),
         }
     }
 }
diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt 
b/datafusion/sqllogictest/test_files/encrypted_parquet.slt
new file mode 100644
index 0000000000..d580b7d1ad
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test parquet encryption and decryption in DataFusion SQL.
+# See datafusion/common/src/config.rs for equivalent rust code
+
+statement count 0
+CREATE EXTERNAL TABLE encrypted_parquet_table
+(
+double_field double,
+float_field float
+)
+STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' OPTIONS (
+    -- Configure encryption for reading and writing Parquet files
+    -- Encryption properties
+    'format.crypto.file_encryption.encrypt_footer' 'true',
+    'format.crypto.file_encryption.footer_key_as_hex' 
'30313233343536373839303132333435',  -- b"0123456789012345"
+    'format.crypto.file_encryption.column_key_as_hex::double_field' 
'31323334353637383930313233343530', -- b"1234567890123450" 
+    'format.crypto.file_encryption.column_key_as_hex::float_field' 
'31323334353637383930313233343531', -- b"1234567890123451" 
+    -- Decryption properties
+    'format.crypto.file_decryption.footer_key_as_hex' 
'30313233343536373839303132333435', -- b"0123456789012345" 
+    'format.crypto.file_decryption.column_key_as_hex::double_field' 
'31323334353637383930313233343530', -- b"1234567890123450" 
+    'format.crypto.file_decryption.column_key_as_hex::float_field' 
'31323334353637383930313233343531', -- b"1234567890123451"
+)
+
+statement count 0
+CREATE TABLE temp_table (
+    double_field double,
+    float_field float
+)
+
+query I
+INSERT INTO temp_table VALUES(-1.0, -1.0)
+----
+1
+
+query I
+INSERT INTO temp_table VALUES(1.0, 2.0)
+----
+1
+
+query I
+INSERT INTO temp_table VALUES(3.0, 4.0)
+----
+1
+
+query I
+INSERT INTO temp_table VALUES(5.0, 6.0)
+----
+1
+
+query I
+INSERT INTO TABLE encrypted_parquet_table(double_field, float_field) SELECT * 
FROM temp_table
+----
+4
+
+query RR
+SELECT * FROM encrypted_parquet_table
+WHERE double_field > 0.0 AND float_field > 0.0
+ORDER BY double_field
+----
+1 2
+3 4
+5 6
+
+statement count 0
+CREATE EXTERNAL TABLE parquet_table
+(
+double_field double,
+float_field float
+)
+STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/'
+
+query error DataFusion error: Parquet error: Parquet error: Parquet file has 
an encrypted footer but decryption properties were not provided
+SELECT * FROM parquet_table
diff --git a/docs/source/user-guide/cli/datasources.md 
b/docs/source/user-guide/cli/datasources.md
index afc4f6c0c5..c15b8a5e46 100644
--- a/docs/source/user-guide/cli/datasources.md
+++ b/docs/source/user-guide/cli/datasources.md
@@ -190,6 +190,47 @@ STORED AS PARQUET
 LOCATION '/mnt/nyctaxi/';
 ```
 
+### Parquet Specific Options
+
+You can specify additional options for parquet files using the `OPTIONS` 
clause.
+For example, to read and write a parquet directory with encryption settings 
you could use:
+
+```sql
+CREATE EXTERNAL TABLE encrypted_parquet_table
+(
+double_field double,
+float_field float
+)
+STORED AS PARQUET LOCATION 'pq/' OPTIONS (
+    -- encryption
+    'format.crypto.file_encryption.encrypt_footer' 'true',
+    'format.crypto.file_encryption.footer_key_as_hex' 
'30313233343536373839303132333435',  -- b"0123456789012345"
+    'format.crypto.file_encryption.column_key_as_hex::double_field' 
'31323334353637383930313233343530', -- b"1234567890123450"
+    'format.crypto.file_encryption.column_key_as_hex::float_field' 
'31323334353637383930313233343531', -- b"1234567890123451"
+    -- decryption
+    'format.crypto.file_decryption.footer_key_as_hex' 
'30313233343536373839303132333435', -- b"0123456789012345"
+    'format.crypto.file_decryption.column_key_as_hex::double_field' 
'31323334353637383930313233343530', -- b"1234567890123450"
+    'format.crypto.file_decryption.column_key_as_hex::float_field' 
'31323334353637383930313233343531', -- b"1234567890123451"
+);
+```
+
+Here the keys are specified in hexadecimal format because they are binary 
data. These can be encoded in SQL using:
+
+```sql
+select encode('0123456789012345', 'hex');
+/*
++----------------------------------------------+
+| encode(Utf8("0123456789012345"),Utf8("hex")) |
++----------------------------------------------+
+| 30313233343536373839303132333435             |
++----------------------------------------------+
+*/
+```
+
+For more details on the available options, refer to the Rust
+[TableParquetOptions](https://docs.rs/datafusion/latest/datafusion/common/config/struct.TableParquetOptions.html)
+documentation in DataFusion.
+
 ## CSV
 
 DataFusion will infer the CSV schema automatically or you can provide it 
explicitly.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to