This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6e0097d35 Expose parquet reader settings using normal DataFusion
`ConfigOptions` (#3822)
6e0097d35 is described below
commit 6e0097d35391fea0d57c1d2ecfdef18437f681f4
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Oct 19 06:22:04 2022 -0400
Expose parquet reader settings using normal DataFusion `ConfigOptions`
(#3822)
* Expose parquet reader settings as DataFusion config settings
* fix logical conflit
* Update tests
---
benchmarks/src/bin/parquet_filter_pushdown.rs | 56 +++++---
datafusion/core/src/config.rs | 78 ++++++++---
datafusion/core/src/datasource/file_format/mod.rs | 2 +
datafusion/core/src/datasource/listing/table.rs | 1 +
datafusion/core/src/execution/context.rs | 9 +-
datafusion/core/src/execution/options.rs | 30 ++++-
.../core/src/physical_optimizer/repartition.rs | 2 +
.../core/src/physical_plan/file_format/avro.rs | 4 +
.../src/physical_plan/file_format/file_stream.rs | 2 +
.../core/src/physical_plan/file_format/json.rs | 4 +
.../core/src/physical_plan/file_format/mod.rs | 12 +-
.../core/src/physical_plan/file_format/parquet.rs | 143 ++++++++++++---------
datafusion/core/src/test/mod.rs | 2 +
datafusion/core/tests/custom_parquet_reader.rs | 2 +
datafusion/core/tests/row.rs | 2 +
datafusion/core/tests/sql/information_schema.rs | 3 +
16 files changed, 250 insertions(+), 102 deletions(-)
diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs
b/benchmarks/src/bin/parquet_filter_pushdown.rs
index e4bc9295e..f77cbc8fd 100644
--- a/benchmarks/src/bin/parquet_filter_pushdown.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -23,6 +23,10 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema,
SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty;
use datafusion::common::{Result, ToDFSchema};
+use datafusion::config::{
+ ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
+ OPT_PARQUET_REORDER_FILTERS,
+};
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
@@ -30,9 +34,7 @@ use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
-use datafusion::physical_plan::file_format::{
- FileScanConfig, ParquetExec, ParquetScanOptions,
-};
+use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use object_store::path::Path;
@@ -109,6 +111,13 @@ async fn main() -> Result<()> {
Ok(())
}
+#[derive(Debug, Clone)]
+struct ParquetScanOptions {
+ pushdown_filters: bool,
+ reorder_filters: bool,
+ enable_page_index: bool,
+}
+
async fn run_benchmarks(
ctx: &mut SessionContext,
object_store_url: ObjectStoreUrl,
@@ -117,15 +126,21 @@ async fn run_benchmarks(
debug: bool,
) -> Result<()> {
let scan_options_matrix = vec![
- ParquetScanOptions::default(),
- ParquetScanOptions::default()
- .with_page_index(true)
- .with_pushdown_filters(true)
- .with_reorder_predicates(true),
- ParquetScanOptions::default()
- .with_page_index(true)
- .with_pushdown_filters(true)
- .with_reorder_predicates(false),
+ ParquetScanOptions {
+ pushdown_filters: false,
+ reorder_filters: false,
+ enable_page_index: false,
+ },
+ ParquetScanOptions {
+ pushdown_filters: true,
+ reorder_filters: true,
+ enable_page_index: true,
+ },
+ ParquetScanOptions {
+ pushdown_filters: true,
+ reorder_filters: true,
+ enable_page_index: false,
+ },
];
let filter_matrix = vec![
@@ -193,6 +208,18 @@ async fn exec_scan(
debug: bool,
) -> Result<usize> {
let schema = BatchBuilder::schema();
+
+ let ParquetScanOptions {
+ pushdown_filters,
+ reorder_filters,
+ enable_page_index,
+ } = scan_options;
+
+ let mut config_options = ConfigOptions::new();
+ config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
+ config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
+ config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
+
let scan_config = FileScanConfig {
object_store_url,
file_schema: schema.clone(),
@@ -206,6 +233,7 @@ async fn exec_scan(
projection: None,
limit: None,
table_partition_cols: vec![],
+ config_options: config_options.into_shareable(),
};
let df_schema = schema.clone().to_dfschema()?;
@@ -217,9 +245,7 @@ async fn exec_scan(
&ExecutionProps::default(),
)?;
- let parquet_exec = Arc::new(
- ParquetExec::new(scan_config, Some(filter),
None).with_scan_options(scan_options),
- );
+ let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter),
None));
let exec = Arc::new(FilterExec::try_new(physical_filter_expr,
parquet_exec)?);
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 2a2139fb2..b95c12d3b 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -21,8 +21,10 @@ use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use log::warn;
+use parking_lot::RwLock;
use std::collections::HashMap;
use std::env;
+use std::sync::Arc;
/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str =
"datafusion.optimizer.filter_null_join_keys";
@@ -43,13 +45,25 @@ pub const OPT_COALESCE_BATCHES: &str =
"datafusion.execution.coalesce_batches";
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";
+/// Configuration option "datafusion.execution.time_zone"
+pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";
+
+/// Configuration option "datafusion.execution.parquet.pushdown_filters"
+pub const OPT_PARQUET_PUSHDOWN_FILTERS: &str =
+ "datafusion.execution.parquet.pushdown_filters";
+
+/// Configuration option "datafusion.execution.parquet.reorder_filters"
+pub const OPT_PARQUET_REORDER_FILTERS: &str =
+ "datafusion.execution.parquet.reorder_filters";
+
+/// Configuration option "datafusion.execution.parquet.enable_page_index"
+pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str =
+ "datafusion.execution.parquet.enable_page_index";
+
/// Configuration option "datafusion.optimizer.skip_failed_rules"
pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
"datafusion.optimizer.skip_failed_rules";
-/// Configuration option "datafusion.execution.time_zone"
-pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";
-
/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
@@ -173,11 +187,11 @@ impl BuiltInConfigs {
false,
),
ConfigDefinition::new_u64(
- OPT_BATCH_SIZE,
- "Default batch size while creating new batches, it's especially
useful for \
- buffer-in-memory batches since creating tiny batches would results
in too much metadata \
- memory consumption.",
- 8192,
+ OPT_BATCH_SIZE,
+ "Default batch size while creating new batches, it's
especially useful for \
+ buffer-in-memory batches since creating tiny batches would
results in too much metadata \
+ memory consumption.",
+ 8192,
),
ConfigDefinition::new_bool(
OPT_COALESCE_BATCHES,
@@ -191,9 +205,35 @@ impl BuiltInConfigs {
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in
conjunction with the \
- configuration setting '{}'.", OPT_COALESCE_BATCHES),
+ configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
),
+ ConfigDefinition::new_string(
+ OPT_TIME_ZONE,
+ "The session time zone which some function require \
+ e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime
according to the time zone,
+ then extract the hour.",
+ "UTC".into()
+ ),
+ ConfigDefinition::new_bool(
+ OPT_PARQUET_PUSHDOWN_FILTERS,
+ "If true, filter expressions are be applied during the parquet
decoding operation to \
+ reduce the number of rows decoded.",
+ false,
+ ),
+ ConfigDefinition::new_bool(
+ OPT_PARQUET_REORDER_FILTERS,
+ "If true, filter expressions evaluated during the parquet
decoding opearation \
+ will be reordered heuristically to minimize the cost of
evaluation. If false, \
+ the filters are applied in the same order as written in the
query.",
+ false,
+ ),
+ ConfigDefinition::new_bool(
+ OPT_PARQUET_ENABLE_PAGE_INDEX,
+ "If true, uses parquet data page level metadata (Page Index)
statistics \
+ to reduce the number of rows decoded.",
+ false,
+ ),
ConfigDefinition::new_bool(
OPT_OPTIMIZER_SKIP_FAILED_RULES,
"When set to true, the logical plan optimizer will produce
warning \
@@ -201,13 +241,7 @@ impl BuiltInConfigs {
rule. When set to false, any rules that produce errors will
cause the query to fail.",
true
),
- ConfigDefinition::new_string(
- OPT_TIME_ZONE,
- "The session time zone which some function require \
- e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime
according to the time zone,
- then extract the hour",
- "UTC".into()
- )]
+ ]
}
}
@@ -255,8 +289,16 @@ impl ConfigOptions {
Self { options }
}
- /// Create new ConfigOptions struct, taking values from environment
variables where possible.
- /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` to control
`datafusion.execution.batch_size`.
+ /// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
+ pub fn into_shareable(self) -> Arc<RwLock<Self>> {
+ Arc::new(RwLock::new(self))
+ }
+
+ /// Create new ConfigOptions struct, taking values from
+ /// environment variables where possible.
+ ///
+ /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
+ /// control `datafusion.execution.batch_size`.
pub fn from_env() -> Self {
let built_in = BuiltInConfigs::new();
let mut options =
HashMap::with_capacity(built_in.config_definitions.len());
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 82f5b1df8..6775117e2 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -84,6 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
#[cfg(test)]
pub(crate) mod test_util {
use super::*;
+ use crate::config::ConfigOptions;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::test::object_store::local_unpartitioned_file;
@@ -122,6 +123,7 @@ pub(crate) mod test_util {
projection,
limit,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
&[],
)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 3a0c4dcee..deaa09249 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -404,6 +404,7 @@ impl TableProvider for ListingTable {
projection: projection.clone(),
limit,
table_partition_cols:
self.options.table_partition_cols.clone(),
+ config_options: ctx.config.config_options(),
},
filters,
)
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index f7fb0eb90..c50f79426 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1184,7 +1184,7 @@ impl SessionConfig {
/// Create an execution config with config options read from the
environment
pub fn from_env() -> Self {
Self {
- config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
+ config_options: ConfigOptions::from_env().into_shareable(),
..Default::default()
}
}
@@ -1324,6 +1324,13 @@ impl SessionConfig {
map
}
+ /// Return a handle to the shared configuration options.
+ ///
+ /// [`config_options`]: SessionContext::config_option
+ pub fn config_options(&self) -> Arc<RwLock<ConfigOptions>> {
+ self.config_options.clone()
+ }
+
/// Add extensions.
///
/// Extensions can be used to attach extra data to the session config --
e.g. tracing information or caches.
diff --git a/datafusion/core/src/execution/options.rs
b/datafusion/core/src/execution/options.rs
index 9ddd3f1d6..150a20670 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -34,7 +34,12 @@ use crate::datasource::{
listing::ListingOptions,
};
-/// CSV file read option
+/// Options that control the reading of CSV files.
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct CsvReadOptions<'a> {
/// Does the CSV file have a header?
@@ -150,7 +155,12 @@ impl<'a> CsvReadOptions<'a> {
}
}
-/// Parquet read options
+/// Options that control the reading of Parquet files.
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct ParquetReadOptions<'a> {
/// File extension; only files with this extension are selected for data
input.
@@ -160,10 +170,12 @@ pub struct ParquetReadOptions<'a> {
pub table_partition_cols: Vec<String>,
/// Should DataFusion parquet reader use the predicate to prune data,
/// overridden by value on execution::context::SessionConfig
+ // TODO move this into ConfigOptions
pub parquet_pruning: bool,
/// Tell the parquet reader to skip any metadata that may be in
/// the file Schema. This can help avoid schema conflicts due to
/// metadata. Defaults to true.
+ // TODO move this into ConfigOptions
pub skip_metadata: bool,
}
@@ -217,7 +229,12 @@ impl<'a> ParquetReadOptions<'a> {
}
}
-/// Avro read options
+/// Options that control the reading of AVRO files.
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct AvroReadOptions<'a> {
/// The data source schema.
@@ -261,7 +278,12 @@ impl<'a> AvroReadOptions<'a> {
}
}
-/// Line-delimited JSON read options
+/// Options that control the reading of Line-delimited JSON files (NDJson)
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct NdJsonReadOptions<'a> {
/// The data source schema.
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index 1d2b25908..839908d06 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -240,6 +240,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use super::*;
+ use crate::config::ConfigOptions;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::aggregates::{
@@ -269,6 +270,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
None,
None,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs
b/datafusion/core/src/physical_plan/file_format/avro.rs
index 0b7841d88..2aab84fad 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -208,6 +208,7 @@ mod private {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
+ use crate::config::ConfigOptions;
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
@@ -237,6 +238,7 @@ mod tests {
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -306,6 +308,7 @@ mod tests {
projection,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -374,6 +377,7 @@ mod tests {
statistics: Statistics::default(),
limit: None,
table_partition_cols: vec!["date".to_owned()],
+ config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 609e3b3a9..df12f3105 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -321,6 +321,7 @@ mod tests {
use futures::StreamExt;
use super::*;
+ use crate::config::ConfigOptions;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::prelude::SessionContext;
@@ -366,6 +367,7 @@ mod tests {
projection: None,
limit,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
};
let file_stream = FileStream::new(
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index d207988f4..c8c5d71bd 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -256,6 +256,7 @@ mod tests {
use object_store::local::LocalFileSystem;
use crate::assert_batches_eq;
+ use crate::config::ConfigOptions;
use crate::datasource::file_format::file_type::FileType;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
@@ -330,6 +331,7 @@ mod tests {
projection: None,
limit: Some(3),
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
file_compression_type.to_owned(),
);
@@ -405,6 +407,7 @@ mod tests {
projection: None,
limit: Some(3),
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
file_compression_type.to_owned(),
);
@@ -450,6 +453,7 @@ mod tests {
projection: Some(vec![0, 2]),
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
file_compression_type.to_owned(),
);
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 2926f18a2..c33e2bc14 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -30,9 +30,7 @@ mod row_filter;
pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
pub(crate) use self::parquet::plan_to_parquet;
-pub use self::parquet::{
- ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory,
ParquetScanOptions,
-};
+pub use self::parquet::{ParquetExec, ParquetFileMetrics,
ParquetFileReaderFactory};
use arrow::{
array::{ArrayData, ArrayRef, DictionaryArray},
buffer::Buffer,
@@ -44,9 +42,10 @@ pub use avro::AvroExec;
pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;
+use parking_lot::RwLock;
-use crate::datasource::listing::FileRange;
use crate::datasource::{listing::PartitionedFile,
object_store::ObjectStoreUrl};
+use crate::{config::ConfigOptions, datasource::listing::FileRange};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
@@ -91,6 +90,8 @@ pub struct FileScanConfig {
pub limit: Option<usize>,
/// The partitioning column names
pub table_partition_cols: Vec<String>,
+ /// Configuration options passed to the physical plans
+ pub config_options: Arc<RwLock<ConfigOptions>>,
}
impl FileScanConfig {
@@ -413,7 +414,7 @@ pub struct FileMeta {
pub object_meta: ObjectMeta,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
- /// An optional field for user defined per object metadata
+ /// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
@@ -698,6 +699,7 @@ mod tests {
projection,
statistics,
table_partition_cols,
+ config_options: ConfigOptions::new().into_shareable(),
}
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index a5b146dff..f5bd89059 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -24,6 +24,9 @@ use std::ops::Range;
use std::sync::Arc;
use std::{any::Any, convert::TryInto};
+use crate::config::OPT_PARQUET_ENABLE_PAGE_INDEX;
+use crate::config::OPT_PARQUET_PUSHDOWN_FILTERS;
+use crate::config::OPT_PARQUET_REORDER_FILTERS;
use crate::datasource::file_format::parquet::fetch_parquet_metadata;
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
@@ -72,43 +75,6 @@ use parquet::file::{
use parquet::format::PageLocation;
use parquet::schema::types::ColumnDescriptor;
-#[derive(Debug, Clone, Default)]
-/// Specify options for the parquet scan
-pub struct ParquetScanOptions {
- /// If true, any available `pruning_predicate` will be converted to a
`RowFilter`
- /// and pushed down to the `ParquetRecordBatchStream`. This will enable
row level
- /// filter at the decoder level. Defaults to false
- pushdown_filters: bool,
- /// If true, the generated `RowFilter` may reorder the predicate `Expr`s
to try and optimize
- /// the cost of filter evaluation.
- reorder_predicates: bool,
- /// If enabled, the reader will read the page index
- /// This is used to optimise filter pushdown
- /// via `RowSelector` and `RowFilter` by
- /// eliminating unnecessary IO and decoding
- enable_page_index: bool,
-}
-
-impl ParquetScanOptions {
- /// Set whether to pushdown pruning predicate to the parquet scan
- pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
- self.pushdown_filters = pushdown_filters;
- self
- }
-
- /// Set whether to reorder pruning predicate expressions in order to
minimize evaluation cost
- pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self
{
- self.reorder_predicates = reorder_predicates;
- self
- }
-
- /// Set whether to read page index when reading parquet
- pub fn with_page_index(mut self, page_index: bool) -> Self {
- self.enable_page_index = page_index;
- self
- }
-}
-
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
@@ -123,8 +89,6 @@ pub struct ParquetExec {
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
- /// Options to specify behavior of parquet scan
- scan_options: ParquetScanOptions,
}
impl ParquetExec {
@@ -165,7 +129,6 @@ impl ParquetExec {
pruning_predicate,
metadata_size_hint,
parquet_file_reader_factory: None,
- scan_options: ParquetScanOptions::default(),
}
}
@@ -194,15 +157,71 @@ impl ParquetExec {
self
}
- /// Configure `ParquetScanOptions`
- pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) ->
Self {
- self.scan_options = scan_options;
+ /// If true, any filter [`Expr`]s on the scan will converted to a
+ /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
+ /// `ParquetRecordBatchStream`. These filters are applied by the
+ /// parquet decoder to skip unecessairly decoding other columns
+ /// which would not pass the predicate. Defaults to false
+ pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self {
+ self.base_config
+ .config_options
+ .write()
+ .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
+ self
+ }
+
+ /// Return the value described in [`Self::with_pushdown_filters`]
+ pub fn pushdown_filters(&self) -> bool {
+ self.base_config
+ .config_options
+ .read()
+ .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS)
+ // default to false
+ .unwrap_or_default()
+ }
+
+ /// If true, the `RowFilter` made by `pushdown_filters` may try to
+ /// minimize the cost of filter evaluation by reordering the
+ /// predicate [`Expr`]s. If false, the predicates are applied in
+ /// the same order as specified in the query. Defaults to false.
+ pub fn with_reorder_filters(self, reorder_filters: bool) -> Self {
+ self.base_config
+ .config_options
+ .write()
+ .set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
+ self
+ }
+
+ /// Return the value described in [`Self::with_reorder_filters`]
+ pub fn reorder_filters(&self) -> bool {
+ self.base_config
+ .config_options
+ .read()
+ .get_bool(OPT_PARQUET_REORDER_FILTERS)
+ // default to false
+ .unwrap_or_default()
+ }
+
+ /// If enabled, the reader will read the page index
+ /// This is used to optimise filter pushdown
+ /// via `RowSelector` and `RowFilter` by
+ /// eliminating unnecessary IO and decoding
+ pub fn with_enable_page_index(self, enable_page_index: bool) -> Self {
+ self.base_config
+ .config_options
+ .write()
+ .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
self
}
- /// Ref to the `ParquetScanOptions`
- pub fn parquet_scan_options(&self) -> &ParquetScanOptions {
- &self.scan_options
+ /// Return the value described in [`Self::with_enable_page_index`]
+ pub fn enable_page_index(&self) -> bool {
+ self.base_config
+ .config_options
+ .read()
+ .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX)
+ // default to false
+ .unwrap_or_default()
}
}
@@ -314,7 +333,9 @@ impl ExecutionPlan for ParquetExec {
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics.clone(),
parquet_file_reader_factory,
- scan_options: self.scan_options.clone(),
+ pushdown_filters: self.pushdown_filters(),
+ reorder_filters: self.reorder_filters(),
+ enable_page_index: self.enable_page_index(),
};
let stream = FileStream::new(
@@ -376,7 +397,9 @@ struct ParquetOpener {
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
- scan_options: ParquetScanOptions,
+ pushdown_filters: bool,
+ reorder_filters: bool,
+ enable_page_index: bool,
}
impl FileOpener for ParquetOpener {
@@ -406,9 +429,9 @@ impl FileOpener for ParquetOpener {
let projection = self.projection.clone();
let pruning_predicate = self.pruning_predicate.clone();
let table_schema = self.table_schema.clone();
- let reorder_predicates = self.scan_options.reorder_predicates;
- let pushdown_filters = self.scan_options.pushdown_filters;
- let enable_page_index = self.scan_options.enable_page_index;
+ let reorder_predicates = self.reorder_filters;
+ let pushdown_filters = self.pushdown_filters;
+ let enable_page_index = self.enable_page_index;
Ok(Box::pin(async move {
let options =
ArrowReaderOptions::new().with_page_index(enable_page_index);
@@ -1138,6 +1161,7 @@ pub async fn plan_to_parquet(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::config::ConfigOptions;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::{FileRange, PartitionedFile};
@@ -1203,17 +1227,16 @@ mod tests {
projection,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
predicate,
None,
);
if pushdown_predicate {
- parquet_exec = parquet_exec.with_scan_options(
- ParquetScanOptions::default()
- .with_pushdown_filters(true)
- .with_reorder_predicates(true),
- );
+ parquet_exec = parquet_exec
+ .with_pushdown_filters(true)
+ .with_reorder_filters(true);
}
let session_ctx = SessionContext::new();
@@ -1695,6 +1718,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
None,
None,
@@ -1796,6 +1820,7 @@ mod tests {
"month".to_owned(),
"day".to_owned(),
],
+ config_options: ConfigOptions::new().into_shareable(),
},
None,
None,
@@ -1854,6 +1879,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
None,
None,
@@ -2476,14 +2502,13 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
Some(filter),
None,
);
- let parquet_exec_page_index = parquet_exec
- .clone()
-
.with_scan_options(ParquetScanOptions::default().with_page_index(true));
+ let parquet_exec_page_index =
parquet_exec.clone().with_enable_page_index(true);
let mut results = parquet_exec_page_index.execute(0, task_ctx)?;
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index d6e2c05fc..bce277676 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -18,6 +18,7 @@
//! Common unit test utility methods
use crate::arrow::array::UInt32Array;
+use crate::config::ConfigOptions;
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
@@ -165,6 +166,7 @@ pub fn partitioned_csv_config(
projection: None,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
})
}
diff --git a/datafusion/core/tests/custom_parquet_reader.rs
b/datafusion/core/tests/custom_parquet_reader.rs
index ac8c98381..ded5fad02 100644
--- a/datafusion/core/tests/custom_parquet_reader.rs
+++ b/datafusion/core/tests/custom_parquet_reader.rs
@@ -24,6 +24,7 @@ mod tests {
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use datafusion::assert_batches_sorted_eq;
+ use datafusion::config::ConfigOptions;
use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
@@ -88,6 +89,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
None,
None,
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 2c840321f..630c28a10 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::config::ConfigOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::object_store::ObjectStoreUrl;
@@ -105,6 +106,7 @@ async fn get_exec(
projection: projection.clone(),
limit,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
&[],
)
diff --git a/datafusion/core/tests/sql/information_schema.rs
b/datafusion/core/tests/sql/information_schema.rs
index d94a9cd56..873ead462 100644
--- a/datafusion/core/tests/sql/information_schema.rs
+++ b/datafusion/core/tests/sql/information_schema.rs
@@ -701,6 +701,9 @@ async fn show_all() {
"| datafusion.execution.batch_size | 8192 |",
"| datafusion.execution.coalesce_batches | true |",
"| datafusion.execution.coalesce_target_batch_size | 4096 |",
+ "| datafusion.execution.parquet.enable_page_index | false |",
+ "| datafusion.execution.parquet.pushdown_filters | false |",
+ "| datafusion.execution.parquet.reorder_filters | false |",
"| datafusion.execution.time_zone | UTC |",
"| datafusion.explain.logical_plan_only | false |",
"| datafusion.explain.physical_plan_only | false |",