alamb commented on a change in pull request #1141:
URL: https://github.com/apache/arrow-datafusion/pull/1141#discussion_r738724739



##########
File path: ballista/rust/core/proto/ballista.proto
##########
@@ -613,33 +614,28 @@ message ScanLimit {
   uint32 limit = 1;
 }
 
-message ParquetScanExecNode {
+message FileScanExecConf {
   repeated FileGroup file_groups = 1;
   Schema schema = 2;
-  uint32 batch_size = 4;
-  repeated uint32 projection = 6;
-  ScanLimit limit = 7;
-  Statistics statistics = 8;
+  uint32 batch_size = 3;
+  repeated uint32 projection = 4;
+  ScanLimit limit = 5;
+  Statistics statistics = 6;
+  repeated string table_partition_cols = 7;
+}
+
+message ParquetScanExecNode {
+  FileScanExecConf base_conf = 1;

Review comment:
       👍 

##########
File path: ballista/rust/core/src/serde/physical_plan/to_proto.rs
##########
@@ -244,90 +246,29 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn 
ExecutionPlan> {
                 ))),
             })
         } else if let Some(exec) = plan.downcast_ref::<CsvExec>() {
-            let file_groups = exec
-                .file_groups()
-                .iter()
-                .map(|p| p.as_slice().into())
-                .collect();
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::CsvScan(
                     protobuf::CsvScanExecNode {
-                        file_groups,
-                        statistics: Some((&exec.statistics()).into()),
-                        limit: exec
-                            .limit()
-                            .map(|l| protobuf::ScanLimit { limit: l as u32 }),
-                        projection: exec
-                            .projection()
-                            .as_ref()
-                            .ok_or_else(|| {
-                                BallistaError::General(
-                                    "projection in CsvExec does not 
exist.".to_owned(),
-                                )
-                            })?
-                            .iter()
-                            .map(|n| *n as u32)
-                            .collect(),
-                        schema: Some(exec.file_schema().as_ref().into()),
+                        base_conf: Some(exec.base_config().try_into()?),
                         has_header: exec.has_header(),
                         delimiter: byte_to_string(exec.delimiter())?,
-                        batch_size: exec.batch_size() as u32,
                     },
                 )),
             })
         } else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
-            let file_groups = exec
-                .file_groups()
-                .iter()
-                .map(|p| p.as_slice().into())
-                .collect();
-
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
                     protobuf::ParquetScanExecNode {
-                        file_groups,
-                        statistics: Some((&exec.statistics()).into()),
-                        limit: exec
-                            .limit()
-                            .map(|l| protobuf::ScanLimit { limit: l as u32 }),
-                        schema: Some(exec.schema().as_ref().into()),
-                        projection: exec
-                            .projection()
-                            .as_ref()
-                            .iter()
-                            .map(|n| *n as u32)
-                            .collect(),
-                        batch_size: exec.batch_size() as u32,
+                        base_conf: Some(exec.base_config().try_into()?),
+                        // TODO serialize predicates

Review comment:
       is this a TODO you plan for this PR? Or a follow on one?

##########
File path: datafusion/src/datasource/listing/table.rs
##########
@@ -51,9 +52,9 @@ pub struct ListingOptions {
     /// partitioning expected should be named "a" and "b":
     /// - If there is a third level of partitioning it will be ignored.
     /// - Files that don't follow this partitioning will be ignored.
-    /// Note that only `DataType::Utf8` is supported for the column type.
-    /// TODO implement case where partitions.len() > 0
-    pub partitions: Vec<String>,
+    /// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently
+    /// supported for the column type.

Review comment:
       as mentioned elsewhere I think it would be fine to say "these are always 
dictionary coded string columns" rather than "currently" which hints at 
changing it in the future. 

##########
File path: ballista/rust/core/proto/ballista.proto
##########
@@ -613,33 +614,28 @@ message ScanLimit {
   uint32 limit = 1;
 }
 
-message ParquetScanExecNode {
+message FileScanExecConf {

Review comment:
       makes sense to me

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -24,19 +24,134 @@ mod json;
 mod parquet;
 
 pub use self::parquet::ParquetExec;
+use arrow::{
+    array::{ArrayData, ArrayRef, DictionaryArray, UInt8BufferBuilder},
+    buffer::Buffer,
+    datatypes::{DataType, Field, Schema, SchemaRef, UInt8Type},
+    error::{ArrowError, Result as ArrowResult},
+    record_batch::RecordBatch,
+};
 pub use avro::AvroExec;
 pub use csv::CsvExec;
 pub use json::NdJsonExec;
 
-use crate::datasource::PartitionedFile;
-use std::fmt::{Display, Formatter, Result};
+use crate::{
+    datasource::{object_store::ObjectStore, PartitionedFile},
+    scalar::ScalarValue,
+};
+use std::{
+    collections::HashMap,
+    fmt::{Display, Formatter, Result as FmtResult},
+    sync::Arc,
+    vec,
+};
+
+use super::{ColumnStatistics, Statistics};
+
+lazy_static! {
+    /// The datatype used for all partitioning columns for now
+    pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = 
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));
+}
+
+/// The base configurations to provide when creating a physical plan for
+/// any given file format.
+#[derive(Debug, Clone)]
+pub struct PhysicalPlanConfig {
+    /// Store from which the `files` should be fetched
+    pub object_store: Arc<dyn ObjectStore>,
+    /// Schema before projection. It contains the columns that are expected
+    /// to be in the files without the table partition columns.
+    pub file_schema: SchemaRef,
+    /// List of files to be processed, grouped into partitions
+    pub file_groups: Vec<Vec<PartitionedFile>>,
+    /// Estimated overall statistics of the files, taking `filters` into 
account.
+    pub statistics: Statistics,
+    /// Columns on which to project the data. Indexes that are higher than the
+    /// number of columns of `file_schema` refer to `table_partition_cols`.
+    pub projection: Option<Vec<usize>>,
+    /// The maximum number of records per arrow column
+    pub batch_size: usize,
+    /// The minimum number of records required from this source plan
+    pub limit: Option<usize>,
+    /// The partitioning column names
+    pub table_partition_cols: Vec<String>,
+}
+
+impl PhysicalPlanConfig {
+    /// Project the schema and the statistics on the given column indices
+    fn project(&self) -> (SchemaRef, Statistics) {

Review comment:
       The one thing that seems a little less than ideal to me is that we had a 
nice separation between file format related options and the storage of them. 
Partitioning columns seems to be more related to the storage rather than the 
format 🤔 
   
   But on the other hand most of the rest of the fields in `PhysicalPlanConfig` 
are not directly related to the format either.
   
   I don't have any concrete suggestion here

##########
File path: ballista/rust/core/src/serde/physical_plan/to_proto.rs
##########
@@ -722,3 +675,32 @@ impl From<&Statistics> for protobuf::Statistics {
         }
     }
 }
+
+impl TryFrom<&PhysicalPlanConfig> for protobuf::FileScanExecConf {
+    type Error = BallistaError;
+    fn try_from(
+        conf: &PhysicalPlanConfig,
+    ) -> Result<protobuf::FileScanExecConf, Self::Error> {
+        let file_groups = conf
+            .file_groups
+            .iter()
+            .map(|p| p.as_slice().try_into())
+            .collect::<Result<Vec<_>, _>>()?;
+
+        Ok(protobuf::FileScanExecConf {
+            file_groups,
+            statistics: Some((&conf.statistics).into()),
+            limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
+            projection: conf
+                .projection
+                .as_ref()
+                .unwrap_or(&vec![])

Review comment:
       it looks like an improvement to me, but previously the code would error 
if `projection: None` was passed and this code will simply convert that to an 
empty list.
   
   Is that intentional?

##########
File path: datafusion/tests/path_partition.rs
##########
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test queries on partitioned datasets
+
+use std::{fs, io, sync::Arc};
+
+use async_trait::async_trait;
+use datafusion::{
+    assert_batches_sorted_eq,
+    datasource::{
+        file_format::{csv::CsvFormat, parquet::ParquetFormat},
+        listing::{ListingOptions, ListingTable},
+        object_store::{
+            local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream,
+            ObjectReader, ObjectStore, SizedFile,
+        },
+    },
+    error::{DataFusionError, Result},
+    physical_plan::ColumnStatistics,
+    prelude::ExecutionContext,
+    test_util::{arrow_test_data, parquet_test_data},
+};
+use futures::{stream, StreamExt};
+
+mod common;
+
+#[tokio::test]
+async fn csv_filter_with_file_col() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+
+    register_partitioned_aggregate_csv(
+        &mut ctx,
+        &[

Review comment:
       so cool

##########
File path: datafusion/src/datasource/listing/helpers.rs
##########
@@ -0,0 +1,729 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Helper functions for the table implementation
+
+use std::sync::Arc;
+
+use arrow::{
+    array::{
+        Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringArray,
+        StringBuilder, UInt64Array, UInt64Builder,
+    },
+    datatypes::{DataType, Field, Schema},
+    record_batch::RecordBatch,
+};
+use chrono::{TimeZone, Utc};
+use futures::{
+    stream::{self},
+    StreamExt, TryStreamExt,
+};
+use log::debug;
+
+use crate::{
+    error::Result,
+    execution::context::ExecutionContext,
+    logical_plan::{self, Expr},
+    physical_plan::functions::Volatility,
+    scalar::ScalarValue,
+};
+
+use crate::datasource::{
+    object_store::{FileMeta, ObjectStore, SizedFile},
+    MemTable, PartitionedFile, PartitionedFileStream,
+};
+
+const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
+const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
+const FILE_MODIFIED_COLUMN_NAME: &str = "_df_part_file_modified_";
+
+/// Check whether the given expression can be resolved using only the columns 
`col_names`.
+/// This means that if this function returns true:
+/// - the table provider can filter the table partition values with this 
expression
+/// - the expression can be marked as `TableProviderFilterPushDown::Exact` 
once this filtering
+/// was performed
+pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {

Review comment:
       I think this could be much more concisely be expressed using 
`ExpressionVisitor` -- perhaps following the model in 
https://github.com/apache/arrow-datafusion/blob/2c8e65bcca9fe41ad80116d99ad974c86cb59654/datafusion/src/optimizer/utils.rs#L60-L98
   
   Where you can skip all the boiler plate related to recursing through the 
different Expr types and make it clear what types of Expr really matter. 

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -50,3 +165,260 @@ impl<'a> Display for FileGroupsDisplay<'a> {
         write!(f, "[{}]", parts.join(", "))
     }
 }
+
+/// A helper that projects partition columns into the file record batches
+struct PartitionColumnProjector {
+    /// An Arrow buffer initialized to zeros that represents the key array of 
all partition
+    /// columns (partition columns are materialized by dictionary arrays with 
only one
+    /// value in the dictionary, thus all the keys are equal to zero).
+    key_buffer_cache: Option<Buffer>,
+    /// Mapping between the indexes in the list of partition columns and the 
target
+    /// schema. Sorted by index in the target schema so that we can iterate on 
it to
+    /// insert the partition columns in the target record batch.
+    projected_partition_indexes: Vec<(usize, usize)>,
+    /// The schema of the table once the projection was applied.
+    projected_schema: SchemaRef,
+}
+
+impl PartitionColumnProjector {
+    // Create a projector to insert the partitioning columns into batches read 
from files
+    // - projected_schema: the target schema with both file and partitioning 
columns
+    // - table_partition_cols: all the partitioning column names
+    fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> 
Self {
+        let mut idx_map = HashMap::new();
+        for (partition_idx, partition_name) in 
table_partition_cols.iter().enumerate() {
+            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
+                idx_map.insert(partition_idx, schema_idx);
+            }
+        }
+
+        let mut projected_partition_indexes: Vec<_> = 
idx_map.into_iter().collect();
+        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
+
+        Self {
+            projected_partition_indexes,
+            key_buffer_cache: None,
+            projected_schema,
+        }
+    }
+
+    // Transform the batch read from the fileby inserting the partitioning 
columns

Review comment:
       ```suggestion
       // Transform the batch read from the file by inserting the partitioning 
columns
   ```

##########
File path: datafusion/tests/path_partition.rs
##########
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test queries on partitioned datasets

Review comment:
       I think it is fine -- sql.rs as you say is too large already

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -936,15 +936,18 @@ mod tests {
             test::create_partitioned_csv("aggregate_test_100.csv", 
partitions).unwrap();
 
         let csv = Arc::new(CsvExec::new(
-            Arc::new(LocalFileSystem {}),
-            files,
-            Statistics::default(),
-            Arc::clone(&schema),
+            PhysicalPlanConfig {

Review comment:
       man the same code was copied over and over and over -- lol

##########
File path: datafusion/src/datasource/file_format/avro.rs
##########
@@ -61,16 +62,9 @@ impl FileFormat for AvroFormat {
     async fn create_physical_plan(
         &self,
         conf: PhysicalPlanConfig,
+        _filters: &[Expr],

Review comment:
       this is neat that the creation of the physical plan gets the filters

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -50,3 +165,260 @@ impl<'a> Display for FileGroupsDisplay<'a> {
         write!(f, "[{}]", parts.join(", "))
     }
 }
+
+/// A helper that projects partition columns into the file record batches
+struct PartitionColumnProjector {
+    /// An Arrow buffer initialized to zeros that represents the key array of 
all partition
+    /// columns (partition columns are materialized by dictionary arrays with 
only one
+    /// value in the dictionary, thus all the keys are equal to zero).
+    key_buffer_cache: Option<Buffer>,
+    /// Mapping between the indexes in the list of partition columns and the 
target
+    /// schema. Sorted by index in the target schema so that we can iterate on 
it to
+    /// insert the partition columns in the target record batch.
+    projected_partition_indexes: Vec<(usize, usize)>,
+    /// The schema of the table once the projection was applied.
+    projected_schema: SchemaRef,
+}
+
+impl PartitionColumnProjector {
+    // Create a projector to insert the partitioning columns into batches read 
from files
+    // - projected_schema: the target schema with both file and partitioning 
columns
+    // - table_partition_cols: all the partitioning column names
+    fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> 
Self {
+        let mut idx_map = HashMap::new();
+        for (partition_idx, partition_name) in 
table_partition_cols.iter().enumerate() {
+            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
+                idx_map.insert(partition_idx, schema_idx);
+            }
+        }
+
+        let mut projected_partition_indexes: Vec<_> = 
idx_map.into_iter().collect();
+        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
+
+        Self {
+            projected_partition_indexes,
+            key_buffer_cache: None,
+            projected_schema,
+        }
+    }
+
+    // Transform the batch read from the fileby inserting the partitioning 
columns
+    // to the right positions as deduced from `projected_schema`
+    // - file_batch: batch read from the file, with internal projection applied
+    // - partition_values: the list of partition values, one for each 
partition column
+    fn project(
+        &mut self,
+        file_batch: RecordBatch,
+        partition_values: &[ScalarValue],
+    ) -> ArrowResult<RecordBatch> {
+        let expected_cols =
+            self.projected_schema.fields().len() - 
self.projected_partition_indexes.len();
+
+        if file_batch.columns().len() != expected_cols {
+            return Err(ArrowError::SchemaError(format!(
+                "Unexpected batch schema from file, expected {} cols but got 
{}",
+                expected_cols,
+                file_batch.columns().len()
+            )));
+        }
+
+        let mut cols = file_batch.columns().to_vec();
+        for &(pidx, sidx) in &self.projected_partition_indexes {
+            cols.insert(
+                sidx,
+                create_dict_array(
+                    &mut self.key_buffer_cache,
+                    &partition_values[pidx],
+                    file_batch.num_rows(),
+                ),
+            )
+        }
+        RecordBatch::try_new(Arc::clone(&self.projected_schema), cols)
+    }
+}
+
+fn create_dict_array(
+    key_buffer_cache: &mut Option<Buffer>,
+    val: &ScalarValue,
+    len: usize,
+) -> ArrayRef {
+    // build value dictionary
+    let dict_vals = val.to_array();
+
+    // build keys array
+    let sliced_key_buffer = match key_buffer_cache {

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -50,3 +165,260 @@ impl<'a> Display for FileGroupsDisplay<'a> {
         write!(f, "[{}]", parts.join(", "))
     }
 }
+
+/// A helper that projects partition columns into the file record batches
+struct PartitionColumnProjector {
+    /// An Arrow buffer initialized to zeros that represents the key array of 
all partition
+    /// columns (partition columns are materialized by dictionary arrays with 
only one
+    /// value in the dictionary, thus all the keys are equal to zero).
+    key_buffer_cache: Option<Buffer>,
+    /// Mapping between the indexes in the list of partition columns and the 
target
+    /// schema. Sorted by index in the target schema so that we can iterate on 
it to
+    /// insert the partition columns in the target record batch.
+    projected_partition_indexes: Vec<(usize, usize)>,
+    /// The schema of the table once the projection was applied.
+    projected_schema: SchemaRef,
+}
+
+impl PartitionColumnProjector {
+    // Create a projector to insert the partitioning columns into batches read 
from files
+    // - projected_schema: the target schema with both file and partitioning 
columns
+    // - table_partition_cols: all the partitioning column names
+    fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> 
Self {
+        let mut idx_map = HashMap::new();
+        for (partition_idx, partition_name) in 
table_partition_cols.iter().enumerate() {
+            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
+                idx_map.insert(partition_idx, schema_idx);
+            }
+        }
+
+        let mut projected_partition_indexes: Vec<_> = 
idx_map.into_iter().collect();
+        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
+
+        Self {
+            projected_partition_indexes,
+            key_buffer_cache: None,
+            projected_schema,
+        }
+    }
+
+    // Transform the batch read from the fileby inserting the partitioning 
columns
+    // to the right positions as deduced from `projected_schema`
+    // - file_batch: batch read from the file, with internal projection applied
+    // - partition_values: the list of partition values, one for each 
partition column
+    fn project(
+        &mut self,
+        file_batch: RecordBatch,
+        partition_values: &[ScalarValue],
+    ) -> ArrowResult<RecordBatch> {
+        let expected_cols =
+            self.projected_schema.fields().len() - 
self.projected_partition_indexes.len();
+
+        if file_batch.columns().len() != expected_cols {
+            return Err(ArrowError::SchemaError(format!(
+                "Unexpected batch schema from file, expected {} cols but got 
{}",
+                expected_cols,
+                file_batch.columns().len()
+            )));
+        }
+
+        let mut cols = file_batch.columns().to_vec();
+        for &(pidx, sidx) in &self.projected_partition_indexes {
+            cols.insert(
+                sidx,
+                create_dict_array(
+                    &mut self.key_buffer_cache,
+                    &partition_values[pidx],
+                    file_batch.num_rows(),
+                ),
+            )
+        }
+        RecordBatch::try_new(Arc::clone(&self.projected_schema), cols)
+    }
+}
+
+fn create_dict_array(
+    key_buffer_cache: &mut Option<Buffer>,
+    val: &ScalarValue,
+    len: usize,
+) -> ArrayRef {
+    // build value dictionary
+    let dict_vals = val.to_array();
+
+    // build keys array
+    let sliced_key_buffer = match key_buffer_cache {
+        Some(buf) if buf.len() >= len => buf.slice(buf.len() - len),
+        _ => {
+            let mut key_buffer_builder = UInt8BufferBuilder::new(len);
+            key_buffer_builder.advance(len); // keys are all 0
+            key_buffer_cache.insert(key_buffer_builder.finish()).clone()
+        }
+    };
+
+    // create data type
+    let data_type =
+        DataType::Dictionary(Box::new(DataType::UInt8), 
Box::new(val.get_datatype()));
+
+    debug_assert_eq!(data_type, *DEFAULT_PARTITION_COLUMN_DATATYPE);
+
+    // assemble pieces together
+    let mut builder = ArrayData::builder(data_type)
+        .len(len)
+        .add_buffer(sliced_key_buffer);
+    builder = builder.add_child_data(dict_vals.data().clone());
+    Arc::new(DictionaryArray::<UInt8Type>::from(builder.build().unwrap()))
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::test::{
+        aggr_test_schema, build_table_i32, columns, 
object_store::TestObjectStore,
+    };
+
+    use super::*;
+
+    #[test]
+    fn physical_plan_config_no_projection() {
+        let file_schema = aggr_test_schema();
+        let conf = config_for_projection(
+            Arc::clone(&file_schema),
+            None,
+            Statistics::default(),
+            vec!["date".to_owned()],
+        );
+
+        let (proj_schema, proj_statistics) = conf.project();
+        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
+        assert_eq!(
+            proj_schema.field(file_schema.fields().len()).name(),
+            "date",
+            "partition columns are the last columns"
+        );
+        assert_eq!(
+            proj_statistics
+                .column_statistics
+                .expect("projection creates column statistics")
+                .len(),
+            file_schema.fields().len() + 1
+        );
+        // TODO implement tests for partition column statistics once 
implemented
+
+        let col_names = conf.projected_file_column_names();
+        assert_eq!(col_names, None);
+
+        let col_indices = conf.file_column_projection_indices();
+        assert_eq!(col_indices, None);
+    }
+
+    #[test]
+    fn physical_plan_config_with_projection() {
+        let file_schema = aggr_test_schema();
+        let conf = config_for_projection(
+            Arc::clone(&file_schema),
+            Some(vec![file_schema.fields().len(), 0]),
+            Statistics {
+                num_rows: Some(10),
+                // assign the column index to distinct_count to help assert
+                // the source statistic after the projection
+                column_statistics: Some(
+                    (0..file_schema.fields().len())
+                        .map(|i| ColumnStatistics {
+                            distinct_count: Some(i),
+                            ..Default::default()
+                        })
+                        .collect(),
+                ),
+                ..Default::default()
+            },
+            vec!["date".to_owned()],
+        );
+
+        let (proj_schema, proj_statistics) = conf.project();
+        assert_eq!(
+            columns(&proj_schema),
+            vec!["date".to_owned(), "c1".to_owned()]
+        );
+        let proj_stat_cols = proj_statistics
+            .column_statistics
+            .expect("projection creates column statistics");
+        assert_eq!(proj_stat_cols.len(), 2);
+        // TODO implement tests for proj_stat_cols[0] once partition column
+        // statistics are implemented
+        assert_eq!(proj_stat_cols[1].distinct_count, Some(0));
+
+        let col_names = conf.projected_file_column_names();
+        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
+
+        let col_indices = conf.file_column_projection_indices();
+        assert_eq!(col_indices, Some(vec![0]));
+    }
+
+    #[test]
+    fn partition_column_projector() {
+        let file_batch = build_table_i32(
+            ("a", &vec![0, 1, 2]),
+            ("b", &vec![-2, -1, 0]),
+            ("c", &vec![10, 11, 12]),
+        );
+        let partition_cols =
+            vec!["year".to_owned(), "month".to_owned(), "day".to_owned()];
+        // create a projected schema
+        let conf = config_for_projection(
+            file_batch.schema(),
+            // keep all cols from file and 2 from partitioning
+            Some(vec![
+                0,
+                1,
+                2,
+                file_batch.schema().fields().len(),
+                file_batch.schema().fields().len() + 2,
+            ]),
+            Statistics::default(),
+            partition_cols.clone(),
+        );
+        let (proj_schema, _) = conf.project();
+        // created a projector for that projected schema
+        let mut proj = PartitionColumnProjector::new(proj_schema, 
&partition_cols);
+        let projected_batch = proj
+            .project(
+                // file_batch is ok here because we kept all the file cols in 
the projection
+                file_batch,
+                &[
+                    ScalarValue::Utf8(Some("2021".to_owned())),
+                    ScalarValue::Utf8(Some("10".to_owned())),
+                    ScalarValue::Utf8(Some("26".to_owned())),
+                ],
+            )
+            .expect("Projection of partition columns into record batch 
failed");
+
+        let expected = vec![
+            "+---+----+----+------+-----+",
+            "| a | b  | c  | year | day |",
+            "+---+----+----+------+-----+",
+            "| 0 | -2 | 10 | 2021 | 26  |",
+            "| 1 | -1 | 11 | 2021 | 26  |",
+            "| 2 | 0  | 12 | 2021 | 26  |",
+            "+---+----+----+------+-----+",
+        ];
+        crate::assert_batches_eq!(expected, &[projected_batch]);

Review comment:
       I think adding a test here that you can feed a second batch through 
(that is larger than the first batch size) and get the expected output would be 
valuable -- that is an important edge case 

##########
File path: datafusion/tests/path_partition.rs
##########
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test queries on partitioned datasets
+
+use std::{fs, io, sync::Arc};
+
+use async_trait::async_trait;
+use datafusion::{
+    assert_batches_sorted_eq,
+    datasource::{
+        file_format::{csv::CsvFormat, parquet::ParquetFormat},
+        listing::{ListingOptions, ListingTable},
+        object_store::{
+            local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream,
+            ObjectReader, ObjectStore, SizedFile,
+        },
+    },
+    error::{DataFusionError, Result},
+    physical_plan::ColumnStatistics,
+    prelude::ExecutionContext,
+    test_util::{arrow_test_data, parquet_test_data},
+};
+use futures::{stream, StreamExt};
+
+mod common;
+
+#[tokio::test]
+async fn csv_filter_with_file_col() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+
+    register_partitioned_aggregate_csv(
+        &mut ctx,
+        &[
+            "mytable/date=2021-10-27/file.csv",
+            "mytable/date=2021-10-28/file.csv",
+        ],
+        &["date"],
+        "mytable",
+    );
+
+    let result = ctx
+        .sql("SELECT c1, c2 FROM t WHERE date='2021-10-27' and date!=c1 LIMIT 
5")
+        .await?
+        .collect()
+        .await?;
+
+    let expected = vec![
+        "+----+----+",
+        "| c1 | c2 |",
+        "+----+----+",
+        "| a  | 1  |",
+        "| b  | 1  |",
+        "| b  | 5  |",
+        "| c  | 2  |",
+        "| d  | 5  |",
+        "+----+----+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_projection_on_partition() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+
+    register_partitioned_aggregate_csv(
+        &mut ctx,
+        &[
+            "mytable/date=2021-10-27/file.csv",
+            "mytable/date=2021-10-28/file.csv",
+        ],
+        &["date"],
+        "mytable",
+    );
+
+    let result = ctx
+        .sql("SELECT c1, date FROM t WHERE date='2021-10-27' LIMIT 5")
+        .await?
+        .collect()
+        .await?;
+
+    let expected = vec![
+        "+----+------------+",
+        "| c1 | date       |",
+        "+----+------------+",
+        "| a  | 2021-10-27 |",
+        "| b  | 2021-10-27 |",
+        "| b  | 2021-10-27 |",
+        "| c  | 2021-10-27 |",
+        "| d  | 2021-10-27 |",
+        "+----+------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_grouping_by_partition() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+
+    register_partitioned_aggregate_csv(
+        &mut ctx,
+        &[
+            "mytable/date=2021-10-26/file.csv",
+            "mytable/date=2021-10-27/file.csv",
+            "mytable/date=2021-10-28/file.csv",
+        ],
+        &["date"],
+        "mytable",
+    );
+
+    let result = ctx
+        .sql("SELECT date, count(*), count(distinct(c1)) FROM t WHERE 
date<='2021-10-27' GROUP BY date")
+        .await?
+        .collect()
+        .await?;
+
+    let expected = vec![
+        "+------------+-----------------+----------------------+",
+        "| date       | COUNT(UInt8(1)) | COUNT(DISTINCT t.c1) |",
+        "+------------+-----------------+----------------------+",
+        "| 2021-10-26 | 100             | 5                    |",
+        "| 2021-10-27 | 100             | 5                    |",
+        "+------------+-----------------+----------------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn parquet_multiple_partitions() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+
+    register_partitioned_alltypes_parquet(
+        &mut ctx,
+        &[
+            "year=2021/month=09/day=09/file.parquet",
+            "year=2021/month=10/day=09/file.parquet",
+            "year=2021/month=10/day=28/file.parquet",
+        ],
+        &["year", "month", "day"],
+        "",
+        "alltypes_plain.parquet",
+    )
+    .await;
+
+    let result = ctx

Review comment:
       this is very cool

##########
File path: datafusion/src/datasource/listing/table.rs
##########
@@ -162,58 +174,76 @@ impl TableProvider for ListingTable {
         filters: &[Expr],
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        // TODO object_store_registry should be provided as param here
-        let (partitioned_file_lists, statistics) = self
-            .list_files_for_scan(
-                Arc::clone(&self.object_store),
-                &self.path,
-                filters,
-                limit,
-            )
-            .await?;
+        let (partitioned_file_lists, statistics) =
+            self.list_files_for_scan(filters, limit).await?;
+
+        // if no files need to be read, return an `EmptyExec`
+        if partitioned_file_lists.is_empty() {
+            let schema = self.schema();
+            let projected_schema = match &projection {
+                None => schema,
+                Some(p) => Arc::new(Schema::new(
+                    p.iter().map(|i| schema.field(*i).clone()).collect(),
+                )),
+            };
+            return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
+        }
+
         // create the execution plan
         self.options
             .format
-            .create_physical_plan(PhysicalPlanConfig {
-                object_store: Arc::clone(&self.object_store),
-                schema: self.schema(),
-                files: partitioned_file_lists,
-                statistics,
-                projection: projection.clone(),
-                batch_size,
-                filters: filters.to_vec(),
-                limit,
-            })
+            .create_physical_plan(
+                PhysicalPlanConfig {
+                    object_store: Arc::clone(&self.object_store),
+                    file_schema: Arc::clone(&self.file_schema),
+                    file_groups: partitioned_file_lists,
+                    statistics,
+                    projection: projection.clone(),
+                    batch_size,
+                    limit,
+                    table_partition_cols: 
self.options.table_partition_cols.clone(),
+                },
+                filters,
+            )
             .await
     }
 
     fn supports_filter_pushdown(
         &self,
-        _filter: &Expr,
+        filter: &Expr,
     ) -> Result<TableProviderFilterPushDown> {
-        Ok(TableProviderFilterPushDown::Inexact)
+        if expr_applicable_for_cols(&self.options.table_partition_cols, 
filter) {

Review comment:
       ❤️ 

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -24,19 +24,134 @@ mod json;
 mod parquet;
 
 pub use self::parquet::ParquetExec;
+use arrow::{
+    array::{ArrayData, ArrayRef, DictionaryArray, UInt8BufferBuilder},
+    buffer::Buffer,
+    datatypes::{DataType, Field, Schema, SchemaRef, UInt8Type},
+    error::{ArrowError, Result as ArrowResult},
+    record_batch::RecordBatch,
+};
 pub use avro::AvroExec;
 pub use csv::CsvExec;
 pub use json::NdJsonExec;
 
-use crate::datasource::PartitionedFile;
-use std::fmt::{Display, Formatter, Result};
+use crate::{
+    datasource::{object_store::ObjectStore, PartitionedFile},
+    scalar::ScalarValue,
+};
+use std::{
+    collections::HashMap,
+    fmt::{Display, Formatter, Result as FmtResult},
+    sync::Arc,
+    vec,
+};
+
+use super::{ColumnStatistics, Statistics};
+
+lazy_static! {
+    /// The datatype used for all partitioning columns for now
+    pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = 
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));

Review comment:
       Given that the partition columns come directly from a file system path 
typically, having them always be a `Utf8` type makes sense -- using dictionary 
encoding to make them more efficient is 👍 
   
   On other words I am not at all sure any other type for partitioning columns 
makes sense and so we could perhaps remove these comments

##########
File path: datafusion/src/datasource/listing/helpers.rs
##########
@@ -0,0 +1,729 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Helper functions for the table implementation
+
+use std::sync::Arc;
+
+use arrow::{
+    array::{
+        Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringArray,
+        StringBuilder, UInt64Array, UInt64Builder,
+    },
+    datatypes::{DataType, Field, Schema},
+    record_batch::RecordBatch,
+};
+use chrono::{TimeZone, Utc};
+use futures::{
+    stream::{self},
+    StreamExt, TryStreamExt,
+};
+use log::debug;
+
+use crate::{
+    error::Result,
+    execution::context::ExecutionContext,
+    logical_plan::{self, Expr},
+    physical_plan::functions::Volatility,
+    scalar::ScalarValue,
+};
+
+use crate::datasource::{
+    object_store::{FileMeta, ObjectStore, SizedFile},
+    MemTable, PartitionedFile, PartitionedFileStream,
+};
+
+const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
+const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
+const FILE_MODIFIED_COLUMN_NAME: &str = "_df_part_file_modified_";
+
+/// Check whether the given expression can be resolved using only the columns 
`col_names`.
+/// This means that if this function returns true:
+/// - the table provider can filter the table partition values with this 
expression
+/// - the expression can be marked as `TableProviderFilterPushDown::Exact` 
once this filtering
+/// was performed
+pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
+    match expr {
+        // leaf
+        Expr::Literal(_) => true,
+        // TODO how to handle qualified / unqualified names?
+        Expr::Column(logical_plan::Column { ref name, .. }) => 
col_names.contains(name),
+        // unary
+        Expr::Alias(child, _)
+        | Expr::Not(child)
+        | Expr::IsNotNull(child)
+        | Expr::IsNull(child)
+        | Expr::Negative(child)
+        | Expr::Cast { expr: child, .. }
+        | Expr::TryCast { expr: child, .. } => 
expr_applicable_for_cols(col_names, child),
+        // binary
+        Expr::BinaryExpr {
+            ref left,
+            ref right,
+            ..
+        } => {
+            expr_applicable_for_cols(col_names, left)
+                && expr_applicable_for_cols(col_names, right)
+        }
+        // ternary
+        Expr::Between {
+            expr: item,
+            low,
+            high,
+            ..
+        } => {
+            expr_applicable_for_cols(col_names, item)
+                && expr_applicable_for_cols(col_names, low)
+                && expr_applicable_for_cols(col_names, high)
+        }
+        // variadic
+        Expr::ScalarFunction { fun, args } => match fun.volatility() {
+            Volatility::Immutable => args
+                .iter()
+                .all(|arg| expr_applicable_for_cols(col_names, arg)),
+            // TODO: Stable functions could be `applicable`, but that would 
require access to the context
+            Volatility::Stable => false,
+            Volatility::Volatile => false,
+        },
+        Expr::ScalarUDF { fun, args } => match fun.signature.volatility {
+            Volatility::Immutable => args
+                .iter()
+                .all(|arg| expr_applicable_for_cols(col_names, arg)),
+            // TODO: Stable functions could be `applicable`, but that would 
require access to the context
+            Volatility::Stable => false,
+            Volatility::Volatile => false,
+        },
+        Expr::InList {
+            expr: item, list, ..
+        } => {
+            expr_applicable_for_cols(col_names, item)
+                && list.iter().all(|e| expr_applicable_for_cols(col_names, e))
+        }
+        Expr::Case {
+            expr,
+            when_then_expr,
+            else_expr,
+        } => {
+            let expr_constant = expr
+                .as_ref()
+                .map(|e| expr_applicable_for_cols(col_names, e))
+                .unwrap_or(true);
+            let else_constant = else_expr
+                .as_ref()
+                .map(|e| expr_applicable_for_cols(col_names, e))
+                .unwrap_or(true);
+            let when_then_constant = when_then_expr.iter().all(|(w, th)| {
+                expr_applicable_for_cols(col_names, w)
+                    && expr_applicable_for_cols(col_names, th)
+            });
+            expr_constant && else_constant && when_then_constant
+        }
+        // TODO other expressions are not handled yet:
+        // - AGGREGATE, WINDOW and SORT should not end up in filter 
conditions, except maybe in some edge cases
+        // - Can `Wildcard` be considered as a `Literal`?
+        // - ScalarVariable could be `applicable`, but that would require 
access to the context
+        _ => false,
+    }
+}
+
+/// Partition the list of files into `n` groups
+pub fn split_files(
+    partitioned_files: Vec<PartitionedFile>,
+    n: usize,
+) -> Vec<Vec<PartitionedFile>> {
+    if partitioned_files.is_empty() {
+        return vec![];
+    }
+    let chunk_size = (partitioned_files.len() + n - 1) / n;
+    partitioned_files
+        .chunks(chunk_size)
+        .map(|c| c.to_vec())
+        .collect()
+}
+
+/// Discover the partitions on the given path and prune out files
+/// that belong to irrelevant partitions using `filters` expressions.
+/// `filters` might contain expressions that can be resolved only at the
+/// file level (e.g. Parquet row group pruning).
+///
+/// TODO for tables with many files (10k+), it will usually more efficient
+/// to first list the folders relative to the first partition dimension,
+/// prune those, then list only the contain of the remaining folders.
+pub async fn pruned_partition_list(
+    store: &dyn ObjectStore,
+    table_path: &str,
+    filters: &[Expr],
+    file_extension: &str,
+    table_partition_cols: &[String],
+) -> Result<PartitionedFileStream> {
+    // if no partition col => simply list all the files
+    if table_partition_cols.is_empty() {
+        return Ok(Box::pin(
+            store
+                .list_file_with_suffix(table_path, file_extension)
+                .await?
+                .map(|f| {
+                    Ok(PartitionedFile {
+                        partition_values: vec![],
+                        file_meta: f?,
+                    })
+                }),
+        ));
+    }
+
+    let applicable_filters: Vec<_> = filters
+        .iter()
+        .filter(|f| expr_applicable_for_cols(table_partition_cols, f))
+        .collect();
+    let stream_path = table_path.to_owned();
+    if applicable_filters.is_empty() {
+        // parse the partition values while listing all the files
+        // TODO we might avoid parsing the partition values if they are not 
used in any projection

Review comment:
       I don't think it will matter -- the time to fetch the listing from 
object store will far dominate the parsing cost I think

##########
File path: datafusion/src/datasource/listing/mod.rs
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A table that uses the `ObjectStore` listing capability
+//! to get the list of files to process.
+
+mod helpers;
+mod table;

Review comment:
       (FWIW this is the kind of change that is an improvement but generates a 
large diff and makes it harder to understand what changed in the logic -- if 
you are looking for ways to make your PRs smaller, one way might be to make a 
first PR that just split up the existing code into two modules) -- then while 
that is being reviewed you can rebase your WIP branch on top of it and carry on

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -50,3 +165,260 @@ impl<'a> Display for FileGroupsDisplay<'a> {
         write!(f, "[{}]", parts.join(", "))
     }
 }
+
+/// A helper that projects partition columns into the file record batches
+struct PartitionColumnProjector {

Review comment:
       I recommend putting the context and design rationale from this comment 
into the code as a doc comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to