seddonm1 commented on a change in pull request #8992: URL: https://github.com/apache/arrow/pull/8992#discussion_r548198355
########## File path: rust/datafusion/src/physical_plan/parquet.rs ########## @@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; -use parquet::file::metadata::ParquetMetaData; use parquet::file::reader::SerializedFileReader; use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; +use crate::datasource::datasource::Statistics; use async_trait::async_trait; use futures::stream::Stream; -/// Execution plan for scanning a Parquet file +/// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { - /// Path to directory containing partitioned Parquet files with the same schema - filenames: Vec<String>, + /// Parquet partitions to read + partitions: Vec<ParquetPartition>, /// Schema after projection is applied schema: SchemaRef, /// Projection for which columns to load projection: Vec<usize>, /// Batch size batch_size: usize, - /// Parquet metadata - metadata: ParquetMetaData, + /// Statistics for the data set (sum of statistics for all partitions) + statistics: Statistics, +} + +/// Represents one partition of a Parquet data set and this currently means one Parquet file. +/// +/// In the future it would be good to support subsets of files based on ranges of row groups +/// so that we can better parallelize reads of large files across available cores (see +/// https://issues.apache.org/jira/browse/ARROW-10995). +/// +/// We may also want to support reading Parquet files that are partitioned based on a key and +/// in this case we would want this partition struct to represent multiple files for a given +/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019). +#[derive(Debug, Clone)] +pub struct ParquetPartition { + /// The Parquet filename for this partition + filename: String, + /// Statistics for this partition + statistics: Statistics, } impl ParquetExec { - /// Create a new Parquet reader execution plan + /// Create a new Parquet reader execution plan based on the specified Parquet filename or + /// directory containing Parquet files pub fn try_new( path: &str, projection: Option<Vec<usize>>, batch_size: usize, ) -> Result<Self> { + // build a list of filenames from the specified path, which could be a single file or + // a directory containing one or more parquet files let mut filenames: Vec<String> = vec![]; common::build_file_list(path, &mut filenames, ".parquet")?; if filenames.is_empty() { - Err(DataFusionError::Plan("No files found".to_string())) + Err(DataFusionError::Plan(format!( + "No Parquet files found at path {}", + path + ))) } else { - let file = File::open(&filenames[0])?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let schema = arrow_reader.get_schema()?; - let metadata = arrow_reader.get_metadata(); - - Ok(Self::new( - filenames, schema, projection, batch_size, metadata, - )) + // build a list of Parquet partitions with statistics and gather all unique schemas + // used in this data set + let mut schemas: Vec<Schema> = vec![]; + let mut partitions = vec![]; + for filename in &filenames { + let file = File::open(filename)?; + let file_reader = Arc::new(SerializedFileReader::new(file)?); + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let meta_data = arrow_reader.get_metadata(); + // collect all the unique schemas in this data set + let schema = arrow_reader.get_schema()?; + if schemas.is_empty() || schema != schemas[0] { + schemas.push(schema); + } + let mut num_rows = 0; + let mut total_byte_size = 0; + for i in 0..meta_data.num_row_groups() { + let row_group_meta = meta_data.row_group(i); + num_rows += row_group_meta.num_rows(); + total_byte_size += row_group_meta.total_byte_size(); + } + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + }; + partitions.push(ParquetPartition { + filename: filename.to_owned(), + statistics, + }); + } + + // we currently get the schema information from the first file rather than do + // schema merging and this is a limitation. + // See https://issues.apache.org/jira/browse/ARROW-11017 + if schemas.len() > 1 { + return Err(DataFusionError::Plan(format!( + "The Parquet files in {} have {} different schemas and DataFusion does \ + not yet support schema merging", path, schemas.len()))); + } + let schema = schemas[0].clone(); + + Ok(Self::new(partitions, schema, projection, batch_size)) } } - /// Create a new Parquet reader execution plan with provided files and schema + /// Create a new Parquet reader execution plan with provided partitions and schema pub fn new( - filenames: Vec<String>, + partitions: Vec<ParquetPartition>, Review comment: It may be premature to support that use case anyway until more of the core engine works. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org