This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 161fcd8  Simplify file struct abstractions (#1120)
161fcd8 is described below

commit 161fcd88f3f827d3c2d10d5de60ec84863d4a162
Author: rdettai <[email protected]>
AuthorDate: Sun Oct 17 21:18:16 2021 +0200

    Simplify file struct abstractions (#1120)
    
    * [refacto] simplify file struct abstractions
    
    * [doc] explicit desc for file_groups
---
 ballista/rust/core/proto/ballista.proto            |  4 +-
 .../core/src/serde/physical_plan/from_proto.rs     | 43 ++++++------
 .../rust/core/src/serde/physical_plan/to_proto.rs  | 20 ++++--
 datafusion/src/datasource/datasource.rs            |  3 +
 datafusion/src/datasource/mod.rs                   | 16 -----
 datafusion/src/physical_plan/file_format/mod.rs    | 23 ++++++
 .../src/physical_plan/file_format/parquet.rs       | 81 +++++-----------------
 7 files changed, 79 insertions(+), 111 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 33638fd..49b65cf 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -596,7 +596,7 @@ message FilterExecNode {
   PhysicalExprNode expr = 2;
 }
 
-message FilePartition {
+message FileGroup {
   repeated PartitionedFile files = 1;
 }
 
@@ -606,7 +606,7 @@ message ScanLimit {
 }
 
 message ParquetScanExecNode {
-  repeated FilePartition partitions = 1;
+  repeated FileGroup file_groups = 1;
   Schema schema = 2;
   uint32 batch_size = 4;
   repeated uint32 projection = 6;
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 680e419..75dd915 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -37,7 +37,7 @@ use datafusion::catalog::catalog::{
 };
 use datafusion::datasource::object_store::local::LocalFileSystem;
 use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, 
SizedFile};
-use datafusion::datasource::{FilePartition, PartitionedFile};
+use datafusion::datasource::PartitionedFile;
 use datafusion::execution::context::{
     ExecutionConfig, ExecutionContextState, ExecutionProps,
 };
@@ -127,8 +127,8 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
                     Arc::new(LocalFileSystem {}),
                     scan.files
                         .iter()
-                        .map(|f| f.try_into())
-                        .collect::<Result<Vec<PartitionedFile>, _>>()?,
+                        .map(|f| f.into())
+                        .collect::<Vec<PartitionedFile>>(),
                     statistics,
                     schema,
                     scan.has_header,
@@ -145,13 +145,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
 
                 Ok(Arc::new(ParquetExec::new(
                     Arc::new(LocalFileSystem {}),
-                    scan.partitions
+                    scan.file_groups
                         .iter()
-                        .map(|p| {
-                            let it = p.files.iter().map(|f| f.try_into());
-                            it.collect::<Result<Vec<PartitionedFile>, _>>()
-                        })
-                        .collect::<Result<Vec<Vec<PartitionedFile>>, _>>()?,
+                        .map(|p| p.into())
+                        .collect::<Vec<Vec<PartitionedFile>>>(),
                     statistics,
                     schema,
                     Some(projection),
@@ -170,8 +167,8 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
                     Arc::new(LocalFileSystem {}),
                     scan.files
                         .iter()
-                        .map(|f| f.try_into())
-                        .collect::<Result<Vec<PartitionedFile>, _>>()?,
+                        .map(|f| f.into())
+                        .collect::<Vec<PartitionedFile>>(),
                     statistics,
                     schema,
                     Some(projection),
@@ -741,23 +738,27 @@ pub fn parse_protobuf_hash_partitioning(
     }
 }
 
-impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
-    type Error = BallistaError;
-
-    fn try_into(self) -> Result<PartitionedFile, Self::Error> {
-        Ok(PartitionedFile {
+impl From<&protobuf::PartitionedFile> for PartitionedFile {
+    fn from(val: &protobuf::PartitionedFile) -> Self {
+        PartitionedFile {
             file_meta: FileMeta {
                 sized_file: SizedFile {
-                    path: self.path.clone(),
-                    size: self.size,
+                    path: val.path.clone(),
+                    size: val.size,
                 },
-                last_modified: if self.last_modified_ns == 0 {
+                last_modified: if val.last_modified_ns == 0 {
                     None
                 } else {
-                    Some(Utc.timestamp_nanos(self.last_modified_ns as i64))
+                    Some(Utc.timestamp_nanos(val.last_modified_ns as i64))
                 },
             },
-        })
+        }
+    }
+}
+
+impl From<&protobuf::FileGroup> for Vec<PartitionedFile> {
+    fn from(val: &protobuf::FileGroup) -> Self {
+        val.files.iter().map(|f| f.into()).collect()
     }
 }
 
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 020b688..e5e6347 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -275,18 +275,16 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn 
ExecutionPlan> {
                 )),
             })
         } else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
-            let partitions = exec
-                .partitions()
-                .into_iter()
-                .map(|p| protobuf::FilePartition {
-                    files: p.iter().map(|f| f.into()).collect(),
-                })
+            let file_groups = exec
+                .file_groups()
+                .iter()
+                .map(|p| p.as_slice().into())
                 .collect();
 
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
                     protobuf::ParquetScanExecNode {
-                        partitions,
+                        file_groups,
                         statistics: Some((&exec.statistics()).into()),
                         limit: exec
                             .limit()
@@ -688,6 +686,14 @@ impl From<&PartitionedFile> for protobuf::PartitionedFile {
     }
 }
 
+impl From<&[PartitionedFile]> for protobuf::FileGroup {
+    fn from(gr: &[PartitionedFile]) -> protobuf::FileGroup {
+        protobuf::FileGroup {
+            files: gr.iter().map(|f| f.into()).collect(),
+        }
+    }
+}
+
 impl From<&ColumnStatistics> for protobuf::ColumnStats {
     fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats {
         protobuf::ColumnStats {
diff --git a/datafusion/src/datasource/datasource.rs 
b/datafusion/src/datasource/datasource.rs
index 918200f..823b408 100644
--- a/datafusion/src/datasource/datasource.rs
+++ b/datafusion/src/datasource/datasource.rs
@@ -71,6 +71,9 @@ pub trait TableProvider: Sync + Send {
     }
 
     /// Create an ExecutionPlan that will scan the table.
+    /// The table provider will be usually responsible of grouping
+    /// the source data into partitions that can be efficiently
+    /// parallelized or distributed.
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index b607469..2e5330f 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -155,22 +155,6 @@ impl std::fmt::Display for PartitionedFile {
     }
 }
 
-#[derive(Debug, Clone)]
-/// A collection of files that should be read in a single task
-pub struct FilePartition {
-    /// The index of the partition among all partitions
-    pub index: usize,
-    /// The contained files of the partition
-    pub files: Vec<PartitionedFile>,
-}
-
-impl std::fmt::Display for FilePartition {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        let files: Vec<String> = self.files.iter().map(|f| 
f.to_string()).collect();
-        write!(f, "{}", files.join(", "))
-    }
-}
-
 fn create_max_min_accs(
     schema: &Schema,
 ) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
diff --git a/datafusion/src/physical_plan/file_format/mod.rs 
b/datafusion/src/physical_plan/file_format/mod.rs
index aa9359c..aa4f116 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -26,3 +26,26 @@ pub use self::parquet::ParquetExec;
 pub use avro::AvroExec;
 pub use csv::CsvExec;
 pub use json::NdJsonExec;
+
+use crate::datasource::PartitionedFile;
+use std::fmt::{Display, Formatter, Result};
+
+/// A wrapper to customize partitioned file display
+#[derive(Debug)]
+struct FileGroupsDisplay<'a>(&'a [Vec<PartitionedFile>]);
+
+impl<'a> Display for FileGroupsDisplay<'a> {
+    fn fmt(&self, f: &mut Formatter) -> Result {
+        let parts: Vec<_> = self
+            .0
+            .iter()
+            .map(|pp| {
+                pp.iter()
+                    .map(|pf| pf.file_meta.path())
+                    .collect::<Vec<_>>()
+                    .join(", ")
+            })
+            .collect();
+        write!(f, "[{}]", parts.join(", "))
+    }
+}
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs 
b/datafusion/src/physical_plan/file_format/parquet.rs
index c011f33..d07d2a9 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -23,6 +23,7 @@ use std::{any::Any, convert::TryInto};
 
 use crate::datasource::file_format::parquet::ChunkObjectReader;
 use crate::datasource::object_store::ObjectStore;
+use crate::datasource::PartitionedFile;
 use crate::{
     error::{DataFusionError, Result},
     logical_plan::{Column, Expr},
@@ -59,14 +60,13 @@ use tokio::{
 
 use async_trait::async_trait;
 
-use crate::datasource::{FilePartition, PartitionedFile};
-
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
     object_store: Arc<dyn ObjectStore>,
-    /// Parquet partitions to read
-    partitions: Vec<ParquetPartition>,
+    /// Grouped list of files. Each group will be processed together by one
+    /// partition of the `ExecutionPlan`.
+    file_groups: Vec<Vec<PartitionedFile>>,
     /// Schema after projection is applied
     schema: SchemaRef,
     /// Projection for which columns to load
@@ -83,23 +83,6 @@ pub struct ParquetExec {
     limit: Option<usize>,
 }
 
-/// Represents one partition of a Parquet data set and this currently means 
one Parquet file.
-///
-/// In the future it would be good to support subsets of files based on ranges 
of row groups
-/// so that we can better parallelize reads of large files across available 
cores (see
-/// [ARROW-10995](https://issues.apache.org/jira/browse/ARROW-10995)).
-///
-/// We may also want to support reading Parquet files that are partitioned 
based on a key and
-/// in this case we would want this partition struct to represent multiple 
files for a given
-/// partition key (see 
[ARROW-11019](https://issues.apache.org/jira/browse/ARROW-11019)).
-#[derive(Debug, Clone)]
-pub struct ParquetPartition {
-    /// The Parquet filename for this partition
-    pub file_partition: FilePartition,
-    /// Execution metrics
-    metrics: ExecutionPlanMetricsSet,
-}
-
 /// Stores metrics about the parquet execution for a particular parquet file
 #[derive(Debug, Clone)]
 struct ParquetFileMetrics {
@@ -115,7 +98,7 @@ impl ParquetExec {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
         object_store: Arc<dyn ObjectStore>,
-        files: Vec<Vec<PartitionedFile>>,
+        file_groups: Vec<Vec<PartitionedFile>>,
         statistics: Statistics,
         schema: SchemaRef,
         projection: Option<Vec<usize>>,
@@ -123,16 +106,8 @@ impl ParquetExec {
         batch_size: usize,
         limit: Option<usize>,
     ) -> Self {
-        debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: 
{:?}, limit: {:?}",
-        files, projection, predicate, limit);
-
-        let metrics = ExecutionPlanMetricsSet::new();
-
-        let partitions = files
-            .into_iter()
-            .enumerate()
-            .map(|(i, f)| ParquetPartition::new(f, i, metrics.clone()))
-            .collect::<Vec<_>>();
+        debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: 
{:?}, limit: {:?}",
+        file_groups, projection, predicate, limit);
 
         let metrics = ExecutionPlanMetricsSet::new();
         let predicate_creation_errors =
@@ -162,7 +137,7 @@ impl ParquetExec {
 
         Self {
             object_store,
-            partitions,
+            file_groups,
             schema: projected_schema,
             projection,
             metrics,
@@ -204,11 +179,8 @@ impl ParquetExec {
     }
 
     /// List of data files
-    pub fn partitions(&self) -> Vec<&[PartitionedFile]> {
-        self.partitions
-            .iter()
-            .map(|fp| fp.file_partition.files.as_slice())
-            .collect()
+    pub fn file_groups(&self) -> &[Vec<PartitionedFile>] {
+        &self.file_groups
     }
     /// Optional projection for which columns to load
     pub fn projection(&self) -> &[usize] {
@@ -225,20 +197,6 @@ impl ParquetExec {
     }
 }
 
-impl ParquetPartition {
-    /// Create a new parquet partition
-    pub fn new(
-        files: Vec<PartitionedFile>,
-        index: usize,
-        metrics: ExecutionPlanMetricsSet,
-    ) -> Self {
-        Self {
-            file_partition: FilePartition { index, files },
-            metrics,
-        }
-    }
-}
-
 impl ParquetFileMetrics {
     /// Create new metrics
     pub fn new(
@@ -279,7 +237,7 @@ impl ExecutionPlan for ParquetExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.partitions.len())
+        Partitioning::UnknownPartitioning(self.file_groups.len())
     }
 
     fn with_new_children(
@@ -304,7 +262,7 @@ impl ExecutionPlan for ParquetExec {
             Receiver<ArrowResult<RecordBatch>>,
         ) = channel(2);
 
-        let partition = self.partitions[partition_index].clone();
+        let partition = self.file_groups[partition_index].clone();
         let metrics = self.metrics.clone();
         let projection = self.projection.clone();
         let predicate_builder = self.predicate_builder.clone();
@@ -342,18 +300,12 @@ impl ExecutionPlan for ParquetExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                let files: Vec<_> = self
-                    .partitions
-                    .iter()
-                    .map(|pp| format!("{}", pp.file_partition))
-                    .collect();
-
                 write!(
                     f,
-                    "ParquetExec: batch_size={}, limit={:?}, partitions=[{}]",
+                    "ParquetExec: batch_size={}, limit={:?}, partitions={}",
                     self.batch_size,
                     self.limit,
-                    files.join(", ")
+                    super::FileGroupsDisplay(&self.file_groups)
                 )
             }
         }
@@ -497,7 +449,7 @@ fn build_row_group_predicate(
 fn read_partition(
     object_store: &dyn ObjectStore,
     partition_index: usize,
-    partition: ParquetPartition,
+    partition: Vec<PartitionedFile>,
     metrics: ExecutionPlanMetricsSet,
     projection: &[usize],
     predicate_builder: &Option<PruningPredicate>,
@@ -506,8 +458,7 @@ fn read_partition(
     limit: Option<usize>,
 ) -> Result<()> {
     let mut total_rows = 0;
-    let all_files = partition.file_partition.files;
-    'outer: for partitioned_file in all_files {
+    'outer: for partitioned_file in partition {
         let file_metrics = ParquetFileMetrics::new(
             partition_index,
             &*partitioned_file.file_meta.path(),

Reply via email to