Dandandan commented on a change in pull request #8992:
URL: https://github.com/apache/arrow/pull/8992#discussion_r548155265
##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -30,62 +30,117 @@ use crate::physical_plan::{common, Partitioning};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
-use parquet::file::metadata::ParquetMetaData;
use parquet::file::reader::SerializedFileReader;
use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
+use crate::datasource::datasource::Statistics;
use async_trait::async_trait;
use futures::stream::Stream;
-/// Execution plan for scanning a Parquet file
+/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
- /// Path to directory containing partitioned Parquet files with the same
schema
- filenames: Vec<String>,
+ /// Parquet partitions to read
+ partitions: Vec<ParquetPartition>,
/// Schema after projection is applied
schema: SchemaRef,
/// Projection for which columns to load
projection: Vec<usize>,
/// Batch size
batch_size: usize,
- /// Parquet metadata
- metadata: ParquetMetaData,
+ /// Statistics for the data set (sum of statistics for all partitions)
+ statistics: Statistics,
+}
+
+/// Represents one partition of a Parquet data set and this currently means
one Parquet file.
+///
+/// In the future it would be good to support subsets of files based on ranges
of row groups
+/// so that we can better parallelize reads of large files across available
cores (see
+/// https://issues.apache.org/jira/browse/ARROW-10995).
+///
+/// We may also want to support reading Parquet files that are partitioned
based on a key and
+/// in this case we would want this partition struct to represent multiple
files for a given
+/// partition key (see https://issues.apache.org/jira/browse/ARROW-11019).
+#[derive(Debug, Clone)]
+pub struct ParquetPartition {
+ /// The Parquet filename for this partition
+ filename: String,
+ /// Statistics for this partition
+ statistics: Statistics,
}
impl ParquetExec {
- /// Create a new Parquet reader execution plan
+ /// Create a new Parquet reader execution plan based on the specified
Parquet filename or
+ /// directory containing Parquet files
pub fn try_new(
path: &str,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
+ // build a list of filenames from the specified path, which could be a
single file or
+ // a directory containing one or more parquet files
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, ".parquet")?;
if filenames.is_empty() {
- Err(DataFusionError::Plan("No files found".to_string()))
+ Err(DataFusionError::Plan(format!(
+ "No Parquet files found at path {}",
+ path
+ )))
} else {
- let file = File::open(&filenames[0])?;
- let file_reader = Arc::new(SerializedFileReader::new(file)?);
- let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
- let schema = arrow_reader.get_schema()?;
- let metadata = arrow_reader.get_metadata();
-
- Ok(Self::new(
- filenames, schema, projection, batch_size, metadata,
- ))
+ // build a list of Parquet partitions with statistics and gather
all unique schemas
+ // used in this data set
+ let mut schemas: Vec<Schema> = vec![];
+ let mut partitions = vec![];
Review comment:
Could use `with_capacity`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]