This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 161fcd8 Simplify file struct abstractions (#1120)
161fcd8 is described below
commit 161fcd88f3f827d3c2d10d5de60ec84863d4a162
Author: rdettai <[email protected]>
AuthorDate: Sun Oct 17 21:18:16 2021 +0200
Simplify file struct abstractions (#1120)
* [refacto] simplify file struct abstractions
* [doc] explicit desc for file_groups
---
ballista/rust/core/proto/ballista.proto | 4 +-
.../core/src/serde/physical_plan/from_proto.rs | 43 ++++++------
.../rust/core/src/serde/physical_plan/to_proto.rs | 20 ++++--
datafusion/src/datasource/datasource.rs | 3 +
datafusion/src/datasource/mod.rs | 16 -----
datafusion/src/physical_plan/file_format/mod.rs | 23 ++++++
.../src/physical_plan/file_format/parquet.rs | 81 +++++-----------------
7 files changed, 79 insertions(+), 111 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 33638fd..49b65cf 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -596,7 +596,7 @@ message FilterExecNode {
PhysicalExprNode expr = 2;
}
-message FilePartition {
+message FileGroup {
repeated PartitionedFile files = 1;
}
@@ -606,7 +606,7 @@ message ScanLimit {
}
message ParquetScanExecNode {
- repeated FilePartition partitions = 1;
+ repeated FileGroup file_groups = 1;
Schema schema = 2;
uint32 batch_size = 4;
repeated uint32 projection = 6;
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 680e419..75dd915 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -37,7 +37,7 @@ use datafusion::catalog::catalog::{
};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry,
SizedFile};
-use datafusion::datasource::{FilePartition, PartitionedFile};
+use datafusion::datasource::PartitionedFile;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
@@ -127,8 +127,8 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
Arc::new(LocalFileSystem {}),
scan.files
.iter()
- .map(|f| f.try_into())
- .collect::<Result<Vec<PartitionedFile>, _>>()?,
+ .map(|f| f.into())
+ .collect::<Vec<PartitionedFile>>(),
statistics,
schema,
scan.has_header,
@@ -145,13 +145,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
Ok(Arc::new(ParquetExec::new(
Arc::new(LocalFileSystem {}),
- scan.partitions
+ scan.file_groups
.iter()
- .map(|p| {
- let it = p.files.iter().map(|f| f.try_into());
- it.collect::<Result<Vec<PartitionedFile>, _>>()
- })
- .collect::<Result<Vec<Vec<PartitionedFile>>, _>>()?,
+ .map(|p| p.into())
+ .collect::<Vec<Vec<PartitionedFile>>>(),
statistics,
schema,
Some(projection),
@@ -170,8 +167,8 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
Arc::new(LocalFileSystem {}),
scan.files
.iter()
- .map(|f| f.try_into())
- .collect::<Result<Vec<PartitionedFile>, _>>()?,
+ .map(|f| f.into())
+ .collect::<Vec<PartitionedFile>>(),
statistics,
schema,
Some(projection),
@@ -741,23 +738,27 @@ pub fn parse_protobuf_hash_partitioning(
}
}
-impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
- type Error = BallistaError;
-
- fn try_into(self) -> Result<PartitionedFile, Self::Error> {
- Ok(PartitionedFile {
+impl From<&protobuf::PartitionedFile> for PartitionedFile {
+ fn from(val: &protobuf::PartitionedFile) -> Self {
+ PartitionedFile {
file_meta: FileMeta {
sized_file: SizedFile {
- path: self.path.clone(),
- size: self.size,
+ path: val.path.clone(),
+ size: val.size,
},
- last_modified: if self.last_modified_ns == 0 {
+ last_modified: if val.last_modified_ns == 0 {
None
} else {
- Some(Utc.timestamp_nanos(self.last_modified_ns as i64))
+ Some(Utc.timestamp_nanos(val.last_modified_ns as i64))
},
},
- })
+ }
+ }
+}
+
+impl From<&protobuf::FileGroup> for Vec<PartitionedFile> {
+ fn from(val: &protobuf::FileGroup) -> Self {
+ val.files.iter().map(|f| f.into()).collect()
}
}
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 020b688..e5e6347 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -275,18 +275,16 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn
ExecutionPlan> {
)),
})
} else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
- let partitions = exec
- .partitions()
- .into_iter()
- .map(|p| protobuf::FilePartition {
- files: p.iter().map(|f| f.into()).collect(),
- })
+ let file_groups = exec
+ .file_groups()
+ .iter()
+ .map(|p| p.as_slice().into())
.collect();
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
- partitions,
+ file_groups,
statistics: Some((&exec.statistics()).into()),
limit: exec
.limit()
@@ -688,6 +686,14 @@ impl From<&PartitionedFile> for protobuf::PartitionedFile {
}
}
+impl From<&[PartitionedFile]> for protobuf::FileGroup {
+ fn from(gr: &[PartitionedFile]) -> protobuf::FileGroup {
+ protobuf::FileGroup {
+ files: gr.iter().map(|f| f.into()).collect(),
+ }
+ }
+}
+
impl From<&ColumnStatistics> for protobuf::ColumnStats {
fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats {
protobuf::ColumnStats {
diff --git a/datafusion/src/datasource/datasource.rs
b/datafusion/src/datasource/datasource.rs
index 918200f..823b408 100644
--- a/datafusion/src/datasource/datasource.rs
+++ b/datafusion/src/datasource/datasource.rs
@@ -71,6 +71,9 @@ pub trait TableProvider: Sync + Send {
}
/// Create an ExecutionPlan that will scan the table.
+ /// The table provider will be usually responsible of grouping
+ /// the source data into partitions that can be efficiently
+ /// parallelized or distributed.
async fn scan(
&self,
projection: &Option<Vec<usize>>,
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index b607469..2e5330f 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -155,22 +155,6 @@ impl std::fmt::Display for PartitionedFile {
}
}
-#[derive(Debug, Clone)]
-/// A collection of files that should be read in a single task
-pub struct FilePartition {
- /// The index of the partition among all partitions
- pub index: usize,
- /// The contained files of the partition
- pub files: Vec<PartitionedFile>,
-}
-
-impl std::fmt::Display for FilePartition {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- let files: Vec<String> = self.files.iter().map(|f|
f.to_string()).collect();
- write!(f, "{}", files.join(", "))
- }
-}
-
fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
diff --git a/datafusion/src/physical_plan/file_format/mod.rs
b/datafusion/src/physical_plan/file_format/mod.rs
index aa9359c..aa4f116 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -26,3 +26,26 @@ pub use self::parquet::ParquetExec;
pub use avro::AvroExec;
pub use csv::CsvExec;
pub use json::NdJsonExec;
+
+use crate::datasource::PartitionedFile;
+use std::fmt::{Display, Formatter, Result};
+
+/// A wrapper to customize partitioned file display
+#[derive(Debug)]
+struct FileGroupsDisplay<'a>(&'a [Vec<PartitionedFile>]);
+
+impl<'a> Display for FileGroupsDisplay<'a> {
+ fn fmt(&self, f: &mut Formatter) -> Result {
+ let parts: Vec<_> = self
+ .0
+ .iter()
+ .map(|pp| {
+ pp.iter()
+ .map(|pf| pf.file_meta.path())
+ .collect::<Vec<_>>()
+ .join(", ")
+ })
+ .collect();
+ write!(f, "[{}]", parts.join(", "))
+ }
+}
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs
b/datafusion/src/physical_plan/file_format/parquet.rs
index c011f33..d07d2a9 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -23,6 +23,7 @@ use std::{any::Any, convert::TryInto};
use crate::datasource::file_format::parquet::ChunkObjectReader;
use crate::datasource::object_store::ObjectStore;
+use crate::datasource::PartitionedFile;
use crate::{
error::{DataFusionError, Result},
logical_plan::{Column, Expr},
@@ -59,14 +60,13 @@ use tokio::{
use async_trait::async_trait;
-use crate::datasource::{FilePartition, PartitionedFile};
-
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
object_store: Arc<dyn ObjectStore>,
- /// Parquet partitions to read
- partitions: Vec<ParquetPartition>,
+ /// Grouped list of files. Each group will be processed together by one
+ /// partition of the `ExecutionPlan`.
+ file_groups: Vec<Vec<PartitionedFile>>,
/// Schema after projection is applied
schema: SchemaRef,
/// Projection for which columns to load
@@ -83,23 +83,6 @@ pub struct ParquetExec {
limit: Option<usize>,
}
-/// Represents one partition of a Parquet data set and this currently means
one Parquet file.
-///
-/// In the future it would be good to support subsets of files based on ranges
of row groups
-/// so that we can better parallelize reads of large files across available
cores (see
-/// [ARROW-10995](https://issues.apache.org/jira/browse/ARROW-10995)).
-///
-/// We may also want to support reading Parquet files that are partitioned
based on a key and
-/// in this case we would want this partition struct to represent multiple
files for a given
-/// partition key (see
[ARROW-11019](https://issues.apache.org/jira/browse/ARROW-11019)).
-#[derive(Debug, Clone)]
-pub struct ParquetPartition {
- /// The Parquet filename for this partition
- pub file_partition: FilePartition,
- /// Execution metrics
- metrics: ExecutionPlanMetricsSet,
-}
-
/// Stores metrics about the parquet execution for a particular parquet file
#[derive(Debug, Clone)]
struct ParquetFileMetrics {
@@ -115,7 +98,7 @@ impl ParquetExec {
#[allow(clippy::too_many_arguments)]
pub fn new(
object_store: Arc<dyn ObjectStore>,
- files: Vec<Vec<PartitionedFile>>,
+ file_groups: Vec<Vec<PartitionedFile>>,
statistics: Statistics,
schema: SchemaRef,
projection: Option<Vec<usize>>,
@@ -123,16 +106,8 @@ impl ParquetExec {
batch_size: usize,
limit: Option<usize>,
) -> Self {
- debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate:
{:?}, limit: {:?}",
- files, projection, predicate, limit);
-
- let metrics = ExecutionPlanMetricsSet::new();
-
- let partitions = files
- .into_iter()
- .enumerate()
- .map(|(i, f)| ParquetPartition::new(f, i, metrics.clone()))
- .collect::<Vec<_>>();
+ debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate:
{:?}, limit: {:?}",
+ file_groups, projection, predicate, limit);
let metrics = ExecutionPlanMetricsSet::new();
let predicate_creation_errors =
@@ -162,7 +137,7 @@ impl ParquetExec {
Self {
object_store,
- partitions,
+ file_groups,
schema: projected_schema,
projection,
metrics,
@@ -204,11 +179,8 @@ impl ParquetExec {
}
/// List of data files
- pub fn partitions(&self) -> Vec<&[PartitionedFile]> {
- self.partitions
- .iter()
- .map(|fp| fp.file_partition.files.as_slice())
- .collect()
+ pub fn file_groups(&self) -> &[Vec<PartitionedFile>] {
+ &self.file_groups
}
/// Optional projection for which columns to load
pub fn projection(&self) -> &[usize] {
@@ -225,20 +197,6 @@ impl ParquetExec {
}
}
-impl ParquetPartition {
- /// Create a new parquet partition
- pub fn new(
- files: Vec<PartitionedFile>,
- index: usize,
- metrics: ExecutionPlanMetricsSet,
- ) -> Self {
- Self {
- file_partition: FilePartition { index, files },
- metrics,
- }
- }
-}
-
impl ParquetFileMetrics {
/// Create new metrics
pub fn new(
@@ -279,7 +237,7 @@ impl ExecutionPlan for ParquetExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(self.partitions.len())
+ Partitioning::UnknownPartitioning(self.file_groups.len())
}
fn with_new_children(
@@ -304,7 +262,7 @@ impl ExecutionPlan for ParquetExec {
Receiver<ArrowResult<RecordBatch>>,
) = channel(2);
- let partition = self.partitions[partition_index].clone();
+ let partition = self.file_groups[partition_index].clone();
let metrics = self.metrics.clone();
let projection = self.projection.clone();
let predicate_builder = self.predicate_builder.clone();
@@ -342,18 +300,12 @@ impl ExecutionPlan for ParquetExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
- let files: Vec<_> = self
- .partitions
- .iter()
- .map(|pp| format!("{}", pp.file_partition))
- .collect();
-
write!(
f,
- "ParquetExec: batch_size={}, limit={:?}, partitions=[{}]",
+ "ParquetExec: batch_size={}, limit={:?}, partitions={}",
self.batch_size,
self.limit,
- files.join(", ")
+ super::FileGroupsDisplay(&self.file_groups)
)
}
}
@@ -497,7 +449,7 @@ fn build_row_group_predicate(
fn read_partition(
object_store: &dyn ObjectStore,
partition_index: usize,
- partition: ParquetPartition,
+ partition: Vec<PartitionedFile>,
metrics: ExecutionPlanMetricsSet,
projection: &[usize],
predicate_builder: &Option<PruningPredicate>,
@@ -506,8 +458,7 @@ fn read_partition(
limit: Option<usize>,
) -> Result<()> {
let mut total_rows = 0;
- let all_files = partition.file_partition.files;
- 'outer: for partitioned_file in all_files {
+ 'outer: for partitioned_file in partition {
let file_metrics = ParquetFileMetrics::new(
partition_index,
&*partitioned_file.file_meta.path(),