alamb commented on code in PR #3822:
URL: https://github.com/apache/arrow-datafusion/pull/3822#discussion_r994848144
##########
benchmarks/src/bin/parquet_filter_pushdown.rs:
##########
@@ -109,6 +111,13 @@ async fn main() -> Result<()> {
Ok(())
}
+#[derive(Debug, Clone)]
+struct ParquetScanOptions {
Review Comment:
this was replicated for the benchmark code as I felt such a struct was the
easiest to understand for this matrix strategy
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -69,43 +72,6 @@ use parquet::file::{
};
use parquet::schema::types::ColumnDescriptor;
-#[derive(Debug, Clone, Default)]
-/// Specify options for the parquet scan
-pub struct ParquetScanOptions {
Review Comment:
The key change to this PR is removing this structure and instead reading the
values from a `ConfigOptions` that is threaded down.
You can see in this PR there is already a structure for configuring parquet
reading (`ParquetReadOptions`) so I actually think this will make the code less
confusing to work with going forward.
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -944,17 +968,16 @@ mod tests {
projection,
limit: None,
table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
},
predicate,
None,
);
if pushdown_predicate {
- parquet_exec = parquet_exec.with_scan_options(
- ParquetScanOptions::default()
- .with_pushdown_filters(true)
- .with_reorder_predicates(true),
- );
+ parquet_exec = parquet_exec
Review Comment:
Here is a good illustration of how the API changes (I think for the better)
##########
datafusion/core/src/execution/options.rs:
##########
@@ -160,10 +170,12 @@ pub struct ParquetReadOptions<'a> {
pub table_partition_cols: Vec<String>,
/// Should DataFusion parquet reader use the predicate to prune data,
/// overridden by value on execution::context::SessionConfig
+ // TODO move this into ConfigOptions
Review Comment:
I will do this as a follow on PR
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -403,9 +426,9 @@ impl FileOpener for ParquetOpener {
let projection = self.projection.clone();
let pruning_predicate = self.pruning_predicate.clone();
let table_schema = self.table_schema.clone();
- let reorder_predicates = self.scan_options.reorder_predicates;
- let pushdown_filters = self.scan_options.pushdown_filters;
- let enable_page_index = self.scan_options.enable_page_index;
+ let reorder_predicates = self.reorder_filters;
Review Comment:
I also took the opportunity to change to consistently use the word `filters`
rather than `filters` and `predciates`
##########
datafusion/core/src/config.rs:
##########
@@ -255,8 +289,16 @@ impl ConfigOptions {
Self { options }
}
- /// Create new ConfigOptions struct, taking values from environment
variables where possible.
- /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` to control
`datafusion.execution.batch_size`.
+ /// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
+ pub fn into_shareable(self) -> Arc<RwLock<Self>> {
+ Arc::new(RwLock::new(self))
+ }
+
+ /// Create new ConfigOptions struct, taking values from
+ /// environment variables where possible.
+ ///
+ /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
Review Comment:
I will add some documentation about this to the datafusion-cli docs as I
couldn't find it when I was looking
##########
datafusion/core/src/config.rs:
##########
@@ -191,23 +205,43 @@ impl BuiltInConfigs {
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in
conjunction with the \
- configuration setting '{}'.", OPT_COALESCE_BATCHES),
+ configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
),
+ ConfigDefinition::new_string(
Review Comment:
I moved this to be with the other settings
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -191,15 +154,71 @@ impl ParquetExec {
self
}
- /// Configure `ParquetScanOptions`
- pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) ->
Self {
- self.scan_options = scan_options;
+ /// If true, any filter [`Expr`]s on the scan will converted to a
+ /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
+ /// `ParquetRecordBatchStream`. These filters are applied by the
+ /// parquet decoder to skip unecessairly decoding other columns
+ /// which would not pass the predicate. Defaults to false
+ pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self {
Review Comment:
This just uses the slightly messier config API to get/set the settings
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]