Dandandan commented on a change in pull request #8992: URL: https://github.com/apache/arrow/pull/8992#discussion_r548155265
########## File path: rust/datafusion/src/physical_plan/parquet.rs ########## @@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; -use parquet::file::metadata::ParquetMetaData; use parquet::file::reader::SerializedFileReader; use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; +use crate::datasource::datasource::Statistics; use async_trait::async_trait; use futures::stream::Stream; -/// Execution plan for scanning a Parquet file +/// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { - /// Path to directory containing partitioned Parquet files with the same schema - filenames: Vec<String>, + /// Parquet partitions to read + partitions: Vec<ParquetPartition>, /// Schema after projection is applied schema: SchemaRef, /// Projection for which columns to load projection: Vec<usize>, /// Batch size batch_size: usize, - /// Parquet metadata - metadata: ParquetMetaData, + /// Statistics for the data set (sum of statistics for all partitions) + statistics: Statistics, +} + +/// Represents one partition of a Parquet data set and this currently means one Parquet file. +/// +/// In the future it would be good to support subsets of files based on ranges of row groups +/// so that we can better parallelize reads of large files across available cores (see +/// https://issues.apache.org/jira/browse/ARROW-10995). +/// +/// We may also want to support reading Parquet files that are partitioned based on a key and +/// in this case we would want this partition struct to represent multiple files for a given +/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019). +#[derive(Debug, Clone)] +pub struct ParquetPartition { + /// The Parquet filename for this partition + filename: String, + /// Statistics for this partition + statistics: Statistics, } impl ParquetExec { - /// Create a new Parquet reader execution plan + /// Create a new Parquet reader execution plan based on the specified Parquet filename or + /// directory containing Parquet files pub fn try_new( path: &str, projection: Option<Vec<usize>>, batch_size: usize, ) -> Result<Self> { + // build a list of filenames from the specified path, which could be a single file or + // a directory containing one or more parquet files let mut filenames: Vec<String> = vec![]; common::build_file_list(path, &mut filenames, ".parquet")?; if filenames.is_empty() { - Err(DataFusionError::Plan("No files found".to_string())) + Err(DataFusionError::Plan(format!( + "No Parquet files found at path {}", + path + ))) } else { - let file = File::open(&filenames[0])?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let schema = arrow_reader.get_schema()?; - let metadata = arrow_reader.get_metadata(); - - Ok(Self::new( - filenames, schema, projection, batch_size, metadata, - )) + // build a list of Parquet partitions with statistics and gather all unique schemas + // used in this data set + let mut schemas: Vec<Schema> = vec![]; + let mut partitions = vec![]; Review comment: Could use `with_capacity` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org