This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ca09cf8f7 Allow `FileSource`-specific repartitioning (#14754)
9ca09cf8f7 is described below

commit 9ca09cf8f769a3f0a64dbc87ec84eb6fe08b36f6
Author: Adam Gutglick <[email protected]>
AuthorDate: Fri Feb 21 12:23:44 2025 +0000

    Allow `FileSource`-specific repartitioning (#14754)
    
    * FileSource specific repartitioning
    
    * fix doc typo
    
    * remove
    
    * Avro doesn't support repartitioning
---
 datafusion/core/src/datasource/data_source.rs      | 34 +++++++++++++++++++---
 .../src/datasource/physical_plan/arrow_file.rs     |  4 ---
 .../core/src/datasource/physical_plan/avro.rs      | 13 ++++++---
 .../core/src/datasource/physical_plan/csv.rs       |  3 --
 .../datasource/physical_plan/file_scan_config.rs   | 31 +++++++-------------
 .../core/src/datasource/physical_plan/json.rs      |  4 ---
 .../src/datasource/physical_plan/parquet/source.rs |  3 --
 7 files changed, 50 insertions(+), 42 deletions(-)

diff --git a/datafusion/core/src/datasource/data_source.rs 
b/datafusion/core/src/datasource/data_source.rs
index d31b68019e..fcb31194ea 100644
--- a/datafusion/core/src/datasource/data_source.rs
+++ b/datafusion/core/src/datasource/data_source.rs
@@ -26,6 +26,8 @@ use crate::datasource::physical_plan::{FileOpener, 
FileScanConfig};
 
 use arrow::datatypes::SchemaRef;
 use datafusion_common::Statistics;
+use datafusion_datasource::file_groups::FileGroupPartitioner;
+use datafusion_physical_expr::LexOrdering;
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
@@ -62,9 +64,33 @@ pub trait FileSource: Send + Sync {
     fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> 
fmt::Result {
         Ok(())
     }
-    /// Return true if the file format supports repartition
+
+    /// If supported by the [`FileSource`], redistribute files across 
partitions according to their size.
+    /// Allows custom file formats to implement their own repartitioning logic.
     ///
-    /// If this returns true, the DataSourceExec may repartition the data
-    /// by breaking up the input files into multiple smaller groups.
-    fn supports_repartition(&self, config: &FileScanConfig) -> bool;
+    /// Provides a default repartitioning behavior, see comments on 
[`FileGroupPartitioner`] for more detail.
+    fn repartitioned(
+        &self,
+        target_partitions: usize,
+        repartition_file_min_size: usize,
+        output_ordering: Option<LexOrdering>,
+        config: &FileScanConfig,
+    ) -> datafusion_common::Result<Option<FileScanConfig>> {
+        if config.file_compression_type.is_compressed() || 
config.new_lines_in_values {
+            return Ok(None);
+        }
+
+        let repartitioned_file_groups_option = FileGroupPartitioner::new()
+            .with_target_partitions(target_partitions)
+            .with_repartition_file_min_size(repartition_file_min_size)
+            .with_preserve_order_within_groups(output_ordering.is_some())
+            .repartition_file_groups(&config.file_groups);
+
+        if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
+            let mut source = config.clone();
+            source.file_groups = repartitioned_file_groups;
+            return Ok(Some(source));
+        }
+        Ok(None)
+    }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs 
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index c6e05893a9..d0d0379248 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -256,10 +256,6 @@ impl FileSource for ArrowSource {
     fn file_type(&self) -> &str {
         "arrow"
     }
-
-    fn supports_repartition(&self, config: &FileScanConfig) -> bool {
-        !(config.file_compression_type.is_compressed() || 
config.new_lines_in_values)
-    }
 }
 
 /// The struct arrow that implements `[FileOpener]` trait
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index 1a88dc31a6..ae98c19a16 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -255,10 +255,15 @@ impl FileSource for AvroSource {
     fn file_type(&self) -> &str {
         "avro"
     }
-    fn supports_repartition(&self, config: &FileScanConfig) -> bool {
-        !(config.file_compression_type.is_compressed()
-            || config.new_lines_in_values
-            || self.as_any().downcast_ref::<AvroSource>().is_some())
+
+    fn repartitioned(
+        &self,
+        _target_partitions: usize,
+        _repartition_file_min_size: usize,
+        _output_ordering: Option<LexOrdering>,
+        _config: &FileScanConfig,
+    ) -> Result<Option<FileScanConfig>> {
+        Ok(None)
     }
 }
 
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 412c90726a..8fcfd6b41e 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -618,9 +618,6 @@ impl FileSource for CsvSource {
     fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
         write!(f, ", has_header={}", self.has_header)
     }
-    fn supports_repartition(&self, config: &FileScanConfig) -> bool {
-        !(config.file_compression_type.is_compressed() || 
config.new_lines_in_values)
-    }
 }
 
 impl FileOpener for CsvOpener {
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 4996b6d97b..5c882ed751 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -19,8 +19,8 @@
 //! file sources.
 
 use super::{
-    get_projected_output_ordering, statistics::MinMaxStatistics, 
FileGroupPartitioner,
-    FileGroupsDisplay, FileStream,
+    get_projected_output_ordering, statistics::MinMaxStatistics, 
FileGroupsDisplay,
+    FileStream,
 };
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::{listing::PartitionedFile, 
object_store::ObjectStoreUrl};
@@ -203,30 +203,21 @@ impl DataSource for FileScanConfig {
         self.fmt_file_source(t, f)
     }
 
-    /// Redistribute files across partitions according to their size
-    /// See comments on [`FileGroupPartitioner`] for more detail.
+    /// If supported by the underlying [`FileSource`], redistribute files 
across partitions according to their size.
     fn repartitioned(
         &self,
         target_partitions: usize,
         repartition_file_min_size: usize,
         output_ordering: Option<LexOrdering>,
     ) -> Result<Option<Arc<dyn DataSource>>> {
-        if !self.source.supports_repartition(self) {
-            return Ok(None);
-        }
-
-        let repartitioned_file_groups_option = FileGroupPartitioner::new()
-            .with_target_partitions(target_partitions)
-            .with_repartition_file_min_size(repartition_file_min_size)
-            .with_preserve_order_within_groups(output_ordering.is_some())
-            .repartition_file_groups(&self.file_groups);
-
-        if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
-            let mut source = self.clone();
-            source.file_groups = repartitioned_file_groups;
-            return Ok(Some(Arc::new(source)));
-        }
-        Ok(None)
+        let source = self.source.repartitioned(
+            target_partitions,
+            repartition_file_min_size,
+            output_ordering,
+            self,
+        )?;
+
+        Ok(source.map(|s| Arc::new(s) as _))
     }
 
     fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 249f50efa5..f2304ed8a3 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -313,10 +313,6 @@ impl FileSource for JsonSource {
     fn file_type(&self) -> &str {
         "json"
     }
-
-    fn supports_repartition(&self, config: &FileScanConfig) -> bool {
-        !(config.file_compression_type.is_compressed() || 
config.new_lines_in_values)
-    }
 }
 
 impl FileOpener for JsonOpener {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/source.rs
index 810a16de41..26a5877e2d 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs
@@ -586,7 +586,4 @@ impl FileSource for ParquetSource {
             }
         }
     }
-    fn supports_repartition(&self, _config: &FileScanConfig) -> bool {
-        true
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to