alamb commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r697989669
##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send +
Sync + 'static>> {
/// Read data from a reader
Reader(std::sync::Mutex<Option<R>>),
}
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub file_path: String,
+ /// Statistics of the file
+ pub statistics: Statistics,
+ // Values of partition columns to be appended to each row
+ // pub partition_value: Option<Vec<ScalarValue>>,
+ // We may include row group range here for a more fine-grained parallel
execution
+}
+
+impl From<String> for PartitionedFile {
+ fn from(file_path: String) -> Self {
+ Self {
+ file_path,
+ statistics: Default::default(),
+ }
+ }
+}
+
+impl std::fmt::Display for PartitionedFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.file_path)
+ }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+ /// The index of the partition among all partitions
+ pub index: usize,
+ /// The contained files of the partition
+ pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let files: Vec<String> = self.files.iter().map(|f|
f.to_string()).collect();
+ write!(f, "{}", files.join(", "))
+ }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct TableDescriptor {
+ /// root path of the table
+ pub path: String,
+ /// All source files in the path
+ pub partition_files: Vec<PartitionedFile>,
+ /// The schema of the files
+ pub schema: SchemaRef,
+}
+
+/// Returned partitioned file with its schema
+pub type FileAndSchema = (PartitionedFile, Schema);
+
+/// Builder for ['TableDescriptor'] inside given path
+pub trait TableDescriptorBuilder {
+ /// Construct a ['TableDescriptor'] from the provided path
+ fn build_table_desc(
+ path: &str,
+ ext: &str,
+ provided_schema: Option<Schema>,
+ collect_statistics: bool,
+ ) -> Result<TableDescriptor> {
+ let filenames = build_file_list(path, ext)?;
+ if filenames.is_empty() {
+ return Err(DataFusionError::Plan(format!(
+ "No file (with .{} extension) found at path {}",
+ ext, path
+ )));
+ }
+
+ // build a list of Parquet partitions with statistics and gather all
unique schemas
+ // used in this data set
+ let mut schemas: Vec<Schema> = vec![];
+ let mut contains_file = false;
+
+ let partitioned_files = filenames
+ .iter()
+ .map(|file_path| {
+ contains_file = true;
+ let result = if collect_statistics {
+ let (pf, schema) = Self::file_meta(file_path)?;
+ if schemas.is_empty() {
+ schemas.push(schema);
+ } else if schema != schemas[0] {
+ // we currently get the schema information from the
first file rather than do
+ // schema merging and this is a limitation.
+ // See
https://issues.apache.org/jira/browse/ARROW-11017
+ return Err(DataFusionError::Plan(format!(
+ "The file {} have different schema from the first
file and DataFusion does \
+ not yet support schema merging",
+ file_path
+ )));
+ }
+ pf
+ } else {
+ PartitionedFile {
+ file_path: file_path.to_owned(),
+ statistics: Statistics::default(),
+ }
+ };
+
+ Ok(result)
+ }).collect::<Result<Vec<PartitionedFile>>>();
+
+ if !contains_file {
+ return Err(DataFusionError::Plan(format!(
+ "No file (with .{} extension) found at path {}",
+ ext, path
+ )));
+ }
+
+ let result_schema = provided_schema.unwrap_or_else(||
schemas.pop().unwrap());
+
+ Ok(TableDescriptor {
+ path: path.to_string(),
+ partition_files: partitioned_files?,
+ schema: Arc::new(result_schema),
+ })
+ }
+
+ /// Get all metadata for a source file, including schema, statistics,
partitions, etc.
+ fn file_meta(file_path: &str) -> Result<FileAndSchema>;
+}
+
+/// Get all files as well as the summary statistics when a limit is provided
Review comment:
```suggestion
/// Get all files as well as the summary statistic
/// if the optional `limit` is provided, includes only sufficient files
/// needed to read up to `limit` number of rows
```
##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send +
Sync + 'static>> {
/// Read data from a reader
Reader(std::sync::Mutex<Option<R>>),
}
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub file_path: String,
Review comment:
I wondered about calling this `file_path` as opposed to `uri`, but given
`path` is the term used in
https://github.com/apache/arrow-datafusion/pull/950/files#diff-45af7ea371f36434653f767a522a50aabf41bf34e4e11117103532363305793dR73
I like the consistency
##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send +
Sync + 'static>> {
/// Read data from a reader
Reader(std::sync::Mutex<Option<R>>),
}
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub file_path: String,
+ /// Statistics of the file
+ pub statistics: Statistics,
+ // Values of partition columns to be appended to each row
Review comment:
I think in order to take full advantage of partition values (which might
span multiple columns, for example), more information about the partitioning
scheme will be needed (e.g. what expression is used to generated partitioning
values). Adding partitioning support to DataFusion's planning / execution is
probably worth its own discussion
(that is to say I agree with postponing adding anything partition specific)
##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -27,14 +27,14 @@ use crate::{
logical_plan::{Column, Expr},
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
physical_plan::{
- common, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
Review comment:
I really like how the statistics and schema related code has been moved
out of `physical_plan` and into` datasource`
##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send +
Sync + 'static>> {
/// Read data from a reader
Reader(std::sync::Mutex<Option<R>>),
}
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub file_path: String,
+ /// Statistics of the file
+ pub statistics: Statistics,
+ // Values of partition columns to be appended to each row
+ // pub partition_value: Option<Vec<ScalarValue>>,
+ // We may include row group range here for a more fine-grained parallel
execution
+}
+
+impl From<String> for PartitionedFile {
+ fn from(file_path: String) -> Self {
+ Self {
+ file_path,
+ statistics: Default::default(),
+ }
+ }
+}
+
+impl std::fmt::Display for PartitionedFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.file_path)
+ }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+ /// The index of the partition among all partitions
+ pub index: usize,
+ /// The contained files of the partition
+ pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let files: Vec<String> = self.files.iter().map(|f|
f.to_string()).collect();
+ write!(f, "{}", files.join(", "))
+ }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct TableDescriptor {
+ /// root path of the table
+ pub path: String,
+ /// All source files in the path
+ pub partition_files: Vec<PartitionedFile>,
+ /// The schema of the files
+ pub schema: SchemaRef,
+}
+
+/// Returned partitioned file with its schema
+pub type FileAndSchema = (PartitionedFile, Schema);
Review comment:
```suggestion
pub struct FileAndSchema {
file: PartitionedFile,
schema: Schema
};
```
I personally find `struct`s with named fields easier to work with than
trying to remember what `input.0` means (aka having to remember what `0` and
`1` tuple offsets mean)
##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send +
Sync + 'static>> {
/// Read data from a reader
Reader(std::sync::Mutex<Option<R>>),
}
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub file_path: String,
+ /// Statistics of the file
+ pub statistics: Statistics,
Review comment:
I think this makes sense to keep as `Statistics` rather than, say
`Option<Statistics>` all the fields are already `Option`
##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send +
Sync + 'static>> {
/// Read data from a reader
Reader(std::sync::Mutex<Option<R>>),
}
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub file_path: String,
+ /// Statistics of the file
+ pub statistics: Statistics,
+ // Values of partition columns to be appended to each row
+ // pub partition_value: Option<Vec<ScalarValue>>,
+ // We may include row group range here for a more fine-grained parallel
execution
+}
+
+impl From<String> for PartitionedFile {
+ fn from(file_path: String) -> Self {
+ Self {
+ file_path,
+ statistics: Default::default(),
+ }
+ }
+}
+
+impl std::fmt::Display for PartitionedFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.file_path)
+ }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+ /// The index of the partition among all partitions
+ pub index: usize,
+ /// The contained files of the partition
+ pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let files: Vec<String> = self.files.iter().map(|f|
f.to_string()).collect();
+ write!(f, "{}", files.join(", "))
+ }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct TableDescriptor {
+ /// root path of the table
+ pub path: String,
+ /// All source files in the path
+ pub partition_files: Vec<PartitionedFile>,
+ /// The schema of the files
+ pub schema: SchemaRef,
+}
+
+/// Returned partitioned file with its schema
+pub type FileAndSchema = (PartitionedFile, Schema);
+
+/// Builder for ['TableDescriptor'] inside given path
+pub trait TableDescriptorBuilder {
+ /// Construct a ['TableDescriptor'] from the provided path
+ fn build_table_desc(
+ path: &str,
+ ext: &str,
+ provided_schema: Option<Schema>,
+ collect_statistics: bool,
+ ) -> Result<TableDescriptor> {
+ let filenames = build_file_list(path, ext)?;
+ if filenames.is_empty() {
+ return Err(DataFusionError::Plan(format!(
+ "No file (with .{} extension) found at path {}",
+ ext, path
+ )));
+ }
+
+ // build a list of Parquet partitions with statistics and gather all
unique schemas
Review comment:
```suggestion
// build a list of partitions with statistics and gather all unique
schemas
```
##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -162,14 +162,15 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema));
- Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection)
+ Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection, None)
}
/// Convert a table provider into a builder with a TableScan
pub fn scan(
table_name: impl Into<String>,
provider: Arc<dyn TableProvider>,
projection: Option<Vec<usize>>,
+ filters: Option<Vec<Expr>>,
Review comment:
I think this argument is likely going to be confusing to users and it
should be removed.
For example as a user of `LogicalPlanBuilder` I would probably assume that
the following plan would return only rows where with `a<5`
```rust
// Build a plan that looks like it would filter out all rows with `a < 5`
let plan = builder.scan("table", provider, None, vec![col("a").lt(lit(5)));
```
However, I am pretty sure it could (and often would) return rows with a >=
5). This is because `filters` added to a `TableScan` node are optional (in the
sense that the provider might not filter rows that do not pass the predicate,
but is not required to). Indeed, even for the parquet provider, the filters are
only used for row group pruning which may or may not be able to filter rows.
I think we could solve this with:
1. Leave `scan` signature alone and rely on the predicate pushdown
optimization to push filters appropriately down to the scan (my preference as
it is simpler for the users)
2. Rename this argument to something like 'optional_filters_for_performance'
and document what it does more carefully. I think it would be challenging to
explain as it might/might not do anything depending on how the data was laid
out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]