alamb commented on a change in pull request #2099:
URL: https://github.com/apache/arrow-datafusion/pull/2099#discussion_r841063796
##########
File path: datafusion/core/src/execution/context.rs
##########
@@ -548,20 +560,19 @@ impl SessionContext {
/// Registers a Parquet data source so that it can be referenced from SQL
statements
/// executed against this context.
- pub async fn register_parquet(&self, name: &str, uri: &str) -> Result<()> {
- let (target_partitions, enable_pruning) = {
+ pub async fn register_parquet(
+ &self,
+ name: &str,
+ uri: &str,
+ options: ParquetReadOptions<'_>,
+ ) -> Result<()> {
+ let (target_partitions, parquet_pruning) = {
Review comment:
Maybe we should eventually move the `parquet_pruning` option out of
`SessionContext` and into the `ParquetReadOptions` structure. As a follow on PR
##########
File path: datafusion/core/src/execution/options.rs
##########
@@ -115,7 +128,57 @@ impl<'a> CsvReadOptions<'a> {
collect_stat: false,
file_extension: self.file_extension.to_owned(),
target_partitions,
+ table_partition_cols: self.table_partition_cols.clone(),
+ }
+ }
+}
+
+/// Parquet read options
+#[derive(Clone)]
+pub struct ParquetReadOptions<'a> {
+ /// File extension; only files with this extension are selected for data
input.
+ /// Defaults to ".parquet".
+ pub file_extension: &'a str,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
+ /// Should DataFusion parquet reader using the predicate to prune data,
following execution::context::SessionConfig
+ pub parquet_pruning: bool,
+}
+
+impl<'a> Default for ParquetReadOptions<'a> {
+ fn default() -> Self {
+ Self {
+ file_extension: DEFAULT_PARQUET_EXTENSION,
table_partition_cols: vec![],
+ parquet_pruning: ParquetFormat::default().enable_pruning(),
+ }
+ }
+}
+
+impl<'a> ParquetReadOptions<'a> {
+ /// Specify parquet_pruning
+ pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self {
+ self.parquet_pruning = parquet_pruning;
+ self
+ }
+
+ /// Specify table_partition_cols for partition pruning
+ pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>)
-> Self {
+ self.table_partition_cols = table_partition_cols;
+ self
+ }
+
+ /// Helper to convert these user facing options to `ListingTable` options
+ pub fn to_listing_options(&self, target_partitions: usize) ->
ListingOptions {
Review comment:
👍
##########
File path: datafusion/core/src/sql/parser.rs
##########
@@ -192,6 +194,35 @@ impl<'a> DFParser<'a> {
}
}
+ fn parse_partitions(&mut self) -> Result<Vec<String>, ParserError> {
+ let mut partitions: Vec<String> = vec![];
+ if !self.parser.consume_token(&Token::LParen)
+ || self.parser.consume_token(&Token::RParen)
+ {
+ return Ok(partitions);
+ }
+
+ loop {
Review comment:
Given this code to parse a comma separated list is duplicated in
`parse_columns` below, perhaps we could refactor into a common function to
reduce the replication -- not needed for this PR though
##########
File path: datafusion/core/src/sql/parser.rs
##########
@@ -277,6 +308,12 @@ impl<'a> DFParser<'a> {
let has_header = self.parse_csv_has_header();
+ let has_partition = self.parse_has_partition();
+ let mut table_partition_cols: Vec<String> = vec![];
+ if has_partition {
+ table_partition_cols = self.parse_partitions()?;
+ }
Review comment:
```suggestion
let table_partition_cols = if self.parse_has_partition() {
self.parse_partitions()?;
} else {
vec![]
};
```
If you wanted to avoid a `mut`
##########
File path: datafusion/core/src/execution/options.rs
##########
@@ -43,8 +47,10 @@ pub struct CsvReadOptions<'a> {
/// Max number of rows to read from CSV files for schema inference if
needed. Defaults to 1000.
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data
input.
- /// Defaults to ".csv".
+ /// Defaults to DEFAULT_CSV_EXTENSION.
Review comment:
👍
##########
File path: datafusion/core/src/logical_plan/builder.rs
##########
@@ -274,21 +275,12 @@ impl LogicalPlanBuilder {
pub async fn scan_parquet_with_name(
object_store: Arc<dyn ObjectStore>,
path: impl Into<String>,
+ options: ParquetReadOptions<'_>,
projection: Option<Vec<usize>>,
target_partitions: usize,
table_name: impl Into<String>,
) -> Result<Self> {
- // TODO remove hard coded enable_pruning
Review comment:
❤️
##########
File path: docs/source/user-guide/sql/ddl.md
##########
@@ -55,6 +55,21 @@ WITH HEADER ROW
LOCATION '/path/to/aggregate_test_100.csv';
```
+If data sources are already partitioned in Hive style, `PARTITIONED BY` can be
used for partition pruning.
Review comment:
❤️
##########
File path: datafusion/core/src/execution/options.rs
##########
@@ -115,7 +128,57 @@ impl<'a> CsvReadOptions<'a> {
collect_stat: false,
file_extension: self.file_extension.to_owned(),
target_partitions,
+ table_partition_cols: self.table_partition_cols.clone(),
+ }
+ }
+}
+
+/// Parquet read options
+#[derive(Clone)]
+pub struct ParquetReadOptions<'a> {
+ /// File extension; only files with this extension are selected for data
input.
+ /// Defaults to ".parquet".
+ pub file_extension: &'a str,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
+ /// Should DataFusion parquet reader using the predicate to prune data,
following execution::context::SessionConfig
Review comment:
```suggestion
/// Should DataFusion parquet reader using the predicate to prune data,
/// overridden by value on execution::context::SessionConfig
```
Maybe after this PR is merged, we can remove the option on `SessionConfig`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]