This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e0097d35 Expose parquet reader settings using normal DataFusion 
`ConfigOptions` (#3822)
6e0097d35 is described below

commit 6e0097d35391fea0d57c1d2ecfdef18437f681f4
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Oct 19 06:22:04 2022 -0400

    Expose parquet reader settings using normal DataFusion `ConfigOptions` 
(#3822)
    
    * Expose parquet reader settings as DataFusion config settings
    
    * fix logical conflit
    
    * Update tests
---
 benchmarks/src/bin/parquet_filter_pushdown.rs      |  56 +++++---
 datafusion/core/src/config.rs                      |  78 ++++++++---
 datafusion/core/src/datasource/file_format/mod.rs  |   2 +
 datafusion/core/src/datasource/listing/table.rs    |   1 +
 datafusion/core/src/execution/context.rs           |   9 +-
 datafusion/core/src/execution/options.rs           |  30 ++++-
 .../core/src/physical_optimizer/repartition.rs     |   2 +
 .../core/src/physical_plan/file_format/avro.rs     |   4 +
 .../src/physical_plan/file_format/file_stream.rs   |   2 +
 .../core/src/physical_plan/file_format/json.rs     |   4 +
 .../core/src/physical_plan/file_format/mod.rs      |  12 +-
 .../core/src/physical_plan/file_format/parquet.rs  | 143 ++++++++++++---------
 datafusion/core/src/test/mod.rs                    |   2 +
 datafusion/core/tests/custom_parquet_reader.rs     |   2 +
 datafusion/core/tests/row.rs                       |   2 +
 datafusion/core/tests/sql/information_schema.rs    |   3 +
 16 files changed, 250 insertions(+), 102 deletions(-)

diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs 
b/benchmarks/src/bin/parquet_filter_pushdown.rs
index e4bc9295e..f77cbc8fd 100644
--- a/benchmarks/src/bin/parquet_filter_pushdown.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -23,6 +23,10 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, 
SchemaRef, TimeUnit};
 use arrow::record_batch::RecordBatch;
 use arrow::util::pretty;
 use datafusion::common::{Result, ToDFSchema};
+use datafusion::config::{
+    ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
+    OPT_PARQUET_REORDER_FILTERS,
+};
 use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::context::ExecutionProps;
@@ -30,9 +34,7 @@ use datafusion::logical_expr::{lit, or, Expr};
 use datafusion::optimizer::utils::disjunction;
 use datafusion::physical_expr::create_physical_expr;
 use datafusion::physical_plan::collect;
-use datafusion::physical_plan::file_format::{
-    FileScanConfig, ParquetExec, ParquetScanOptions,
-};
+use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::prelude::{col, SessionConfig, SessionContext};
 use object_store::path::Path;
@@ -109,6 +111,13 @@ async fn main() -> Result<()> {
     Ok(())
 }
 
+#[derive(Debug, Clone)]
+struct ParquetScanOptions {
+    pushdown_filters: bool,
+    reorder_filters: bool,
+    enable_page_index: bool,
+}
+
 async fn run_benchmarks(
     ctx: &mut SessionContext,
     object_store_url: ObjectStoreUrl,
@@ -117,15 +126,21 @@ async fn run_benchmarks(
     debug: bool,
 ) -> Result<()> {
     let scan_options_matrix = vec![
-        ParquetScanOptions::default(),
-        ParquetScanOptions::default()
-            .with_page_index(true)
-            .with_pushdown_filters(true)
-            .with_reorder_predicates(true),
-        ParquetScanOptions::default()
-            .with_page_index(true)
-            .with_pushdown_filters(true)
-            .with_reorder_predicates(false),
+        ParquetScanOptions {
+            pushdown_filters: false,
+            reorder_filters: false,
+            enable_page_index: false,
+        },
+        ParquetScanOptions {
+            pushdown_filters: true,
+            reorder_filters: true,
+            enable_page_index: true,
+        },
+        ParquetScanOptions {
+            pushdown_filters: true,
+            reorder_filters: true,
+            enable_page_index: false,
+        },
     ];
 
     let filter_matrix = vec![
@@ -193,6 +208,18 @@ async fn exec_scan(
     debug: bool,
 ) -> Result<usize> {
     let schema = BatchBuilder::schema();
+
+    let ParquetScanOptions {
+        pushdown_filters,
+        reorder_filters,
+        enable_page_index,
+    } = scan_options;
+
+    let mut config_options = ConfigOptions::new();
+    config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
+    config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
+    config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
+
     let scan_config = FileScanConfig {
         object_store_url,
         file_schema: schema.clone(),
@@ -206,6 +233,7 @@ async fn exec_scan(
         projection: None,
         limit: None,
         table_partition_cols: vec![],
+        config_options: config_options.into_shareable(),
     };
 
     let df_schema = schema.clone().to_dfschema()?;
@@ -217,9 +245,7 @@ async fn exec_scan(
         &ExecutionProps::default(),
     )?;
 
-    let parquet_exec = Arc::new(
-        ParquetExec::new(scan_config, Some(filter), 
None).with_scan_options(scan_options),
-    );
+    let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), 
None));
 
     let exec = Arc::new(FilterExec::try_new(physical_filter_expr, 
parquet_exec)?);
 
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 2a2139fb2..b95c12d3b 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -21,8 +21,10 @@ use arrow::datatypes::DataType;
 use datafusion_common::ScalarValue;
 use itertools::Itertools;
 use log::warn;
+use parking_lot::RwLock;
 use std::collections::HashMap;
 use std::env;
+use std::sync::Arc;
 
 /// Configuration option "datafusion.optimizer.filter_null_join_keys"
 pub const OPT_FILTER_NULL_JOIN_KEYS: &str = 
"datafusion.optimizer.filter_null_join_keys";
@@ -43,13 +45,25 @@ pub const OPT_COALESCE_BATCHES: &str = 
"datafusion.execution.coalesce_batches";
 pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
     "datafusion.execution.coalesce_target_batch_size";
 
+/// Configuration option "datafusion.execution.time_zone"
+pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";
+
+/// Configuration option "datafusion.execution.parquet.pushdown_filters"
+pub const OPT_PARQUET_PUSHDOWN_FILTERS: &str =
+    "datafusion.execution.parquet.pushdown_filters";
+
+/// Configuration option "datafusion.execution.parquet.reorder_filters"
+pub const OPT_PARQUET_REORDER_FILTERS: &str =
+    "datafusion.execution.parquet.reorder_filters";
+
+/// Configuration option "datafusion.execution.parquet.enable_page_index"
+pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str =
+    "datafusion.execution.parquet.enable_page_index";
+
 /// Configuration option "datafusion.optimizer.skip_failed_rules"
 pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
     "datafusion.optimizer.skip_failed_rules";
 
-/// Configuration option "datafusion.execution.time_zone"
-pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";
-
 /// Definition of a configuration option
 pub struct ConfigDefinition {
     /// key used to identifier this configuration option
@@ -173,11 +187,11 @@ impl BuiltInConfigs {
                 false,
             ),
             ConfigDefinition::new_u64(
-            OPT_BATCH_SIZE,
-            "Default batch size while creating new batches, it's especially 
useful for \
-            buffer-in-memory batches since creating tiny batches would results 
in too much metadata \
-            memory consumption.",
-            8192,
+                OPT_BATCH_SIZE,
+                "Default batch size while creating new batches, it's 
especially useful for \
+                 buffer-in-memory batches since creating tiny batches would 
results in too much metadata \
+                 memory consumption.",
+                8192,
             ),
             ConfigDefinition::new_bool(
                 OPT_COALESCE_BATCHES,
@@ -191,9 +205,35 @@ impl BuiltInConfigs {
              ConfigDefinition::new_u64(
                  OPT_COALESCE_TARGET_BATCH_SIZE,
                  format!("Target batch size when coalescing batches. Uses in 
conjunction with the \
-            configuration setting '{}'.", OPT_COALESCE_BATCHES),
+                          configuration setting '{}'.", OPT_COALESCE_BATCHES),
                  4096,
             ),
+            ConfigDefinition::new_string(
+                OPT_TIME_ZONE,
+                "The session time zone which some function require \
+                e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime 
according to the time zone,
+                then extract the hour.",
+                "UTC".into()
+            ),
+            ConfigDefinition::new_bool(
+                OPT_PARQUET_PUSHDOWN_FILTERS,
+                "If true, filter expressions are be applied during the parquet 
decoding operation to \
+                 reduce the number of rows decoded.",
+                false,
+            ),
+            ConfigDefinition::new_bool(
+                OPT_PARQUET_REORDER_FILTERS,
+                "If true, filter expressions evaluated during the parquet 
decoding opearation \
+                 will be reordered heuristically to minimize the cost of 
evaluation. If false, \
+                 the filters are applied in the same order as written in the 
query.",
+                false,
+            ),
+            ConfigDefinition::new_bool(
+                OPT_PARQUET_ENABLE_PAGE_INDEX,
+                "If true, uses parquet data page level metadata (Page Index) 
statistics \
+                 to reduce the number of rows decoded.",
+                false,
+            ),
             ConfigDefinition::new_bool(
                 OPT_OPTIMIZER_SKIP_FAILED_RULES,
                 "When set to true, the logical plan optimizer will produce 
warning \
@@ -201,13 +241,7 @@ impl BuiltInConfigs {
                 rule. When set to false, any rules that produce errors will 
cause the query to fail.",
                 true
             ),
-            ConfigDefinition::new_string(
-                OPT_TIME_ZONE,
-                "The session time zone which some function require \
-                e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime 
according to the time zone,
-                then extract the hour",
-                "UTC".into()
-            )]
+            ]
         }
     }
 
@@ -255,8 +289,16 @@ impl ConfigOptions {
         Self { options }
     }
 
-    /// Create new ConfigOptions struct, taking values from environment 
variables where possible.
-    /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` to control 
`datafusion.execution.batch_size`.
+    /// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
+    pub fn into_shareable(self) -> Arc<RwLock<Self>> {
+        Arc::new(RwLock::new(self))
+    }
+
+    /// Create new ConfigOptions struct, taking values from
+    /// environment variables where possible.
+    ///
+    /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
+    /// control `datafusion.execution.batch_size`.
     pub fn from_env() -> Self {
         let built_in = BuiltInConfigs::new();
         let mut options = 
HashMap::with_capacity(built_in.config_definitions.len());
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 82f5b1df8..6775117e2 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -84,6 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
 #[cfg(test)]
 pub(crate) mod test_util {
     use super::*;
+    use crate::config::ConfigOptions;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::test::object_store::local_unpartitioned_file;
@@ -122,6 +123,7 @@ pub(crate) mod test_util {
                     projection,
                     limit,
                     table_partition_cols: vec![],
+                    config_options: ConfigOptions::new().into_shareable(),
                 },
                 &[],
             )
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 3a0c4dcee..deaa09249 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -404,6 +404,7 @@ impl TableProvider for ListingTable {
                     projection: projection.clone(),
                     limit,
                     table_partition_cols: 
self.options.table_partition_cols.clone(),
+                    config_options: ctx.config.config_options(),
                 },
                 filters,
             )
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index f7fb0eb90..c50f79426 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1184,7 +1184,7 @@ impl SessionConfig {
     /// Create an execution config with config options read from the 
environment
     pub fn from_env() -> Self {
         Self {
-            config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
+            config_options: ConfigOptions::from_env().into_shareable(),
             ..Default::default()
         }
     }
@@ -1324,6 +1324,13 @@ impl SessionConfig {
         map
     }
 
+    /// Return a handle to the shared configuration options.
+    ///
+    /// [`config_options`]: SessionContext::config_option
+    pub fn config_options(&self) -> Arc<RwLock<ConfigOptions>> {
+        self.config_options.clone()
+    }
+
     /// Add extensions.
     ///
     /// Extensions can be used to attach extra data to the session config -- 
e.g. tracing information or caches.
diff --git a/datafusion/core/src/execution/options.rs 
b/datafusion/core/src/execution/options.rs
index 9ddd3f1d6..150a20670 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -34,7 +34,12 @@ use crate::datasource::{
     listing::ListingOptions,
 };
 
-/// CSV file read option
+/// Options that control the reading of CSV files.
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
 #[derive(Clone)]
 pub struct CsvReadOptions<'a> {
     /// Does the CSV file have a header?
@@ -150,7 +155,12 @@ impl<'a> CsvReadOptions<'a> {
     }
 }
 
-/// Parquet read options
+/// Options that control the reading of Parquet files.
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
 #[derive(Clone)]
 pub struct ParquetReadOptions<'a> {
     /// File extension; only files with this extension are selected for data 
input.
@@ -160,10 +170,12 @@ pub struct ParquetReadOptions<'a> {
     pub table_partition_cols: Vec<String>,
     /// Should DataFusion parquet reader use the predicate to prune data,
     /// overridden by value on execution::context::SessionConfig
+    // TODO move this into ConfigOptions
     pub parquet_pruning: bool,
     /// Tell the parquet reader to skip any metadata that may be in
     /// the file Schema. This can help avoid schema conflicts due to
     /// metadata.  Defaults to true.
+    // TODO move this into ConfigOptions
     pub skip_metadata: bool,
 }
 
@@ -217,7 +229,12 @@ impl<'a> ParquetReadOptions<'a> {
     }
 }
 
-/// Avro read options
+/// Options that control the reading of AVRO files.
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
 #[derive(Clone)]
 pub struct AvroReadOptions<'a> {
     /// The data source schema.
@@ -261,7 +278,12 @@ impl<'a> AvroReadOptions<'a> {
     }
 }
 
-/// Line-delimited JSON read options
+/// Options that control the reading of Line-delimited JSON files (NDJson)
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
 #[derive(Clone)]
 pub struct NdJsonReadOptions<'a> {
     /// The data source schema.
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 1d2b25908..839908d06 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -240,6 +240,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 
     use super::*;
+    use crate::config::ConfigOptions;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::physical_plan::aggregates::{
@@ -269,6 +270,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             None,
             None,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs 
b/datafusion/core/src/physical_plan/file_format/avro.rs
index 0b7841d88..2aab84fad 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -208,6 +208,7 @@ mod private {
 #[cfg(test)]
 #[cfg(feature = "avro")]
 mod tests {
+    use crate::config::ConfigOptions;
     use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
@@ -237,6 +238,7 @@ mod tests {
             projection: Some(vec![0, 1, 2]),
             limit: None,
             table_partition_cols: vec![],
+            config_options: ConfigOptions::new().into_shareable(),
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
@@ -306,6 +308,7 @@ mod tests {
             projection,
             limit: None,
             table_partition_cols: vec![],
+            config_options: ConfigOptions::new().into_shareable(),
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
@@ -374,6 +377,7 @@ mod tests {
             statistics: Statistics::default(),
             limit: None,
             table_partition_cols: vec!["date".to_owned()],
+            config_options: ConfigOptions::new().into_shareable(),
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs 
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 609e3b3a9..df12f3105 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -321,6 +321,7 @@ mod tests {
     use futures::StreamExt;
 
     use super::*;
+    use crate::config::ConfigOptions;
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
     use crate::prelude::SessionContext;
@@ -366,6 +367,7 @@ mod tests {
             projection: None,
             limit,
             table_partition_cols: vec![],
+            config_options: ConfigOptions::new().into_shareable(),
         };
 
         let file_stream = FileStream::new(
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index d207988f4..c8c5d71bd 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -256,6 +256,7 @@ mod tests {
     use object_store::local::LocalFileSystem;
 
     use crate::assert_batches_eq;
+    use crate::config::ConfigOptions;
     use crate::datasource::file_format::file_type::FileType;
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
@@ -330,6 +331,7 @@ mod tests {
                 projection: None,
                 limit: Some(3),
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             file_compression_type.to_owned(),
         );
@@ -405,6 +407,7 @@ mod tests {
                 projection: None,
                 limit: Some(3),
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             file_compression_type.to_owned(),
         );
@@ -450,6 +453,7 @@ mod tests {
                 projection: Some(vec![0, 2]),
                 limit: None,
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             file_compression_type.to_owned(),
         );
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 2926f18a2..c33e2bc14 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -30,9 +30,7 @@ mod row_filter;
 pub(crate) use self::csv::plan_to_csv;
 pub use self::csv::CsvExec;
 pub(crate) use self::parquet::plan_to_parquet;
-pub use self::parquet::{
-    ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, 
ParquetScanOptions,
-};
+pub use self::parquet::{ParquetExec, ParquetFileMetrics, 
ParquetFileReaderFactory};
 use arrow::{
     array::{ArrayData, ArrayRef, DictionaryArray},
     buffer::Buffer,
@@ -44,9 +42,10 @@ pub use avro::AvroExec;
 pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
 pub(crate) use json::plan_to_json;
 pub use json::NdJsonExec;
+use parking_lot::RwLock;
 
-use crate::datasource::listing::FileRange;
 use crate::datasource::{listing::PartitionedFile, 
object_store::ObjectStoreUrl};
+use crate::{config::ConfigOptions, datasource::listing::FileRange};
 use crate::{
     error::{DataFusionError, Result},
     scalar::ScalarValue,
@@ -91,6 +90,8 @@ pub struct FileScanConfig {
     pub limit: Option<usize>,
     /// The partitioning column names
     pub table_partition_cols: Vec<String>,
+    /// Configuration options passed to the physical plans
+    pub config_options: Arc<RwLock<ConfigOptions>>,
 }
 
 impl FileScanConfig {
@@ -413,7 +414,7 @@ pub struct FileMeta {
     pub object_meta: ObjectMeta,
     /// An optional file range for a more fine-grained parallel execution
     pub range: Option<FileRange>,
-    /// An optional field for user defined per object metadata  
+    /// An optional field for user defined per object metadata
     pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
 }
 
@@ -698,6 +699,7 @@ mod tests {
             projection,
             statistics,
             table_partition_cols,
+            config_options: ConfigOptions::new().into_shareable(),
         }
     }
 }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index a5b146dff..f5bd89059 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -24,6 +24,9 @@ use std::ops::Range;
 use std::sync::Arc;
 use std::{any::Any, convert::TryInto};
 
+use crate::config::OPT_PARQUET_ENABLE_PAGE_INDEX;
+use crate::config::OPT_PARQUET_PUSHDOWN_FILTERS;
+use crate::config::OPT_PARQUET_REORDER_FILTERS;
 use crate::datasource::file_format::parquet::fetch_parquet_metadata;
 use crate::datasource::listing::FileRange;
 use crate::physical_plan::file_format::file_stream::{
@@ -72,43 +75,6 @@ use parquet::file::{
 use parquet::format::PageLocation;
 use parquet::schema::types::ColumnDescriptor;
 
-#[derive(Debug, Clone, Default)]
-/// Specify options for the parquet scan
-pub struct ParquetScanOptions {
-    /// If true, any available `pruning_predicate` will be converted to a 
`RowFilter`
-    /// and pushed down to the `ParquetRecordBatchStream`. This will enable 
row level
-    /// filter at the decoder level. Defaults to false
-    pushdown_filters: bool,
-    /// If true, the generated `RowFilter` may reorder the predicate `Expr`s 
to try and optimize
-    /// the cost of filter evaluation.
-    reorder_predicates: bool,
-    /// If enabled, the reader will read the page index
-    /// This is used to optimise filter pushdown
-    /// via `RowSelector` and `RowFilter` by
-    /// eliminating unnecessary IO and decoding
-    enable_page_index: bool,
-}
-
-impl ParquetScanOptions {
-    /// Set whether to pushdown pruning predicate to the parquet scan
-    pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
-        self.pushdown_filters = pushdown_filters;
-        self
-    }
-
-    /// Set whether to reorder pruning predicate expressions in order to 
minimize evaluation cost
-    pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self 
{
-        self.reorder_predicates = reorder_predicates;
-        self
-    }
-
-    /// Set whether to read page index when reading parquet
-    pub fn with_page_index(mut self, page_index: bool) -> Self {
-        self.enable_page_index = page_index;
-        self
-    }
-}
-
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
@@ -123,8 +89,6 @@ pub struct ParquetExec {
     metadata_size_hint: Option<usize>,
     /// Optional user defined parquet file reader factory
     parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
-    /// Options to specify behavior of parquet scan
-    scan_options: ParquetScanOptions,
 }
 
 impl ParquetExec {
@@ -165,7 +129,6 @@ impl ParquetExec {
             pruning_predicate,
             metadata_size_hint,
             parquet_file_reader_factory: None,
-            scan_options: ParquetScanOptions::default(),
         }
     }
 
@@ -194,15 +157,71 @@ impl ParquetExec {
         self
     }
 
-    /// Configure `ParquetScanOptions`
-    pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) -> 
Self {
-        self.scan_options = scan_options;
+    /// If true, any filter [`Expr`]s on the scan will converted to a
+    /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
+    /// `ParquetRecordBatchStream`. These filters are applied by the
+    /// parquet decoder to skip unecessairly decoding other columns
+    /// which would not pass the predicate. Defaults to false
+    pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self {
+        self.base_config
+            .config_options
+            .write()
+            .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
+        self
+    }
+
+    /// Return the value described in [`Self::with_pushdown_filters`]
+    pub fn pushdown_filters(&self) -> bool {
+        self.base_config
+            .config_options
+            .read()
+            .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS)
+            // default to false
+            .unwrap_or_default()
+    }
+
+    /// If true, the `RowFilter` made by `pushdown_filters` may try to
+    /// minimize the cost of filter evaluation by reordering the
+    /// predicate [`Expr`]s. If false, the predicates are applied in
+    /// the same order as specified in the query. Defaults to false.
+    pub fn with_reorder_filters(self, reorder_filters: bool) -> Self {
+        self.base_config
+            .config_options
+            .write()
+            .set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
+        self
+    }
+
+    /// Return the value described in [`Self::with_reorder_filters`]
+    pub fn reorder_filters(&self) -> bool {
+        self.base_config
+            .config_options
+            .read()
+            .get_bool(OPT_PARQUET_REORDER_FILTERS)
+            // default to false
+            .unwrap_or_default()
+    }
+
+    /// If enabled, the reader will read the page index
+    /// This is used to optimise filter pushdown
+    /// via `RowSelector` and `RowFilter` by
+    /// eliminating unnecessary IO and decoding
+    pub fn with_enable_page_index(self, enable_page_index: bool) -> Self {
+        self.base_config
+            .config_options
+            .write()
+            .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
         self
     }
 
-    /// Ref to the `ParquetScanOptions`
-    pub fn parquet_scan_options(&self) -> &ParquetScanOptions {
-        &self.scan_options
+    /// Return the value described in [`Self::with_enable_page_index`]
+    pub fn enable_page_index(&self) -> bool {
+        self.base_config
+            .config_options
+            .read()
+            .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX)
+            // default to false
+            .unwrap_or_default()
     }
 }
 
@@ -314,7 +333,9 @@ impl ExecutionPlan for ParquetExec {
             metadata_size_hint: self.metadata_size_hint,
             metrics: self.metrics.clone(),
             parquet_file_reader_factory,
-            scan_options: self.scan_options.clone(),
+            pushdown_filters: self.pushdown_filters(),
+            reorder_filters: self.reorder_filters(),
+            enable_page_index: self.enable_page_index(),
         };
 
         let stream = FileStream::new(
@@ -376,7 +397,9 @@ struct ParquetOpener {
     metadata_size_hint: Option<usize>,
     metrics: ExecutionPlanMetricsSet,
     parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
-    scan_options: ParquetScanOptions,
+    pushdown_filters: bool,
+    reorder_filters: bool,
+    enable_page_index: bool,
 }
 
 impl FileOpener for ParquetOpener {
@@ -406,9 +429,9 @@ impl FileOpener for ParquetOpener {
         let projection = self.projection.clone();
         let pruning_predicate = self.pruning_predicate.clone();
         let table_schema = self.table_schema.clone();
-        let reorder_predicates = self.scan_options.reorder_predicates;
-        let pushdown_filters = self.scan_options.pushdown_filters;
-        let enable_page_index = self.scan_options.enable_page_index;
+        let reorder_predicates = self.reorder_filters;
+        let pushdown_filters = self.pushdown_filters;
+        let enable_page_index = self.enable_page_index;
 
         Ok(Box::pin(async move {
             let options = 
ArrowReaderOptions::new().with_page_index(enable_page_index);
@@ -1138,6 +1161,7 @@ pub async fn plan_to_parquet(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::config::ConfigOptions;
     use crate::datasource::file_format::parquet::test_util::store_parquet;
     use crate::datasource::file_format::test_util::scan_format;
     use crate::datasource::listing::{FileRange, PartitionedFile};
@@ -1203,17 +1227,16 @@ mod tests {
                 projection,
                 limit: None,
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             predicate,
             None,
         );
 
         if pushdown_predicate {
-            parquet_exec = parquet_exec.with_scan_options(
-                ParquetScanOptions::default()
-                    .with_pushdown_filters(true)
-                    .with_reorder_predicates(true),
-            );
+            parquet_exec = parquet_exec
+                .with_pushdown_filters(true)
+                .with_reorder_filters(true);
         }
 
         let session_ctx = SessionContext::new();
@@ -1695,6 +1718,7 @@ mod tests {
                     projection: None,
                     limit: None,
                     table_partition_cols: vec![],
+                    config_options: ConfigOptions::new().into_shareable(),
                 },
                 None,
                 None,
@@ -1796,6 +1820,7 @@ mod tests {
                     "month".to_owned(),
                     "day".to_owned(),
                 ],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             None,
             None,
@@ -1854,6 +1879,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             None,
             None,
@@ -2476,14 +2502,13 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             Some(filter),
             None,
         );
 
-        let parquet_exec_page_index = parquet_exec
-            .clone()
-            
.with_scan_options(ParquetScanOptions::default().with_page_index(true));
+        let parquet_exec_page_index = 
parquet_exec.clone().with_enable_page_index(true);
 
         let mut results = parquet_exec_page_index.execute(0, task_ctx)?;
 
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index d6e2c05fc..bce277676 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -18,6 +18,7 @@
 //! Common unit test utility methods
 
 use crate::arrow::array::UInt32Array;
+use crate::config::ConfigOptions;
 use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::datasource::listing::PartitionedFile;
 use crate::datasource::object_store::ObjectStoreUrl;
@@ -165,6 +166,7 @@ pub fn partitioned_csv_config(
         projection: None,
         limit: None,
         table_partition_cols: vec![],
+        config_options: ConfigOptions::new().into_shareable(),
     })
 }
 
diff --git a/datafusion/core/tests/custom_parquet_reader.rs 
b/datafusion/core/tests/custom_parquet_reader.rs
index ac8c98381..ded5fad02 100644
--- a/datafusion/core/tests/custom_parquet_reader.rs
+++ b/datafusion/core/tests/custom_parquet_reader.rs
@@ -24,6 +24,7 @@ mod tests {
     use arrow::record_batch::RecordBatch;
     use bytes::Bytes;
     use datafusion::assert_batches_sorted_eq;
+    use datafusion::config::ConfigOptions;
     use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
     use datafusion::datasource::listing::PartitionedFile;
     use datafusion::datasource::object_store::ObjectStoreUrl;
@@ -88,6 +89,7 @@ mod tests {
                 projection: None,
                 limit: None,
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             None,
             None,
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 2c840321f..630c28a10 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::config::ConfigOptions;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
 use datafusion::datasource::object_store::ObjectStoreUrl;
@@ -105,6 +106,7 @@ async fn get_exec(
                 projection: projection.clone(),
                 limit,
                 table_partition_cols: vec![],
+                config_options: ConfigOptions::new().into_shareable(),
             },
             &[],
         )
diff --git a/datafusion/core/tests/sql/information_schema.rs 
b/datafusion/core/tests/sql/information_schema.rs
index d94a9cd56..873ead462 100644
--- a/datafusion/core/tests/sql/information_schema.rs
+++ b/datafusion/core/tests/sql/information_schema.rs
@@ -701,6 +701,9 @@ async fn show_all() {
         "| datafusion.execution.batch_size                 | 8192    |",
         "| datafusion.execution.coalesce_batches           | true    |",
         "| datafusion.execution.coalesce_target_batch_size | 4096    |",
+        "| datafusion.execution.parquet.enable_page_index  | false   |",
+        "| datafusion.execution.parquet.pushdown_filters   | false   |",
+        "| datafusion.execution.parquet.reorder_filters    | false   |",
         "| datafusion.execution.time_zone                  | UTC     |",
         "| datafusion.explain.logical_plan_only            | false   |",
         "| datafusion.explain.physical_plan_only           | false   |",


Reply via email to