This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9ca09cf8f7 Allow `FileSource`-specific repartitioning (#14754)
9ca09cf8f7 is described below
commit 9ca09cf8f769a3f0a64dbc87ec84eb6fe08b36f6
Author: Adam Gutglick <[email protected]>
AuthorDate: Fri Feb 21 12:23:44 2025 +0000
Allow `FileSource`-specific repartitioning (#14754)
* FileSource specific repartitioning
* fix doc typo
* remove
* Avro doesn't support repartitioning
---
datafusion/core/src/datasource/data_source.rs | 34 +++++++++++++++++++---
.../src/datasource/physical_plan/arrow_file.rs | 4 ---
.../core/src/datasource/physical_plan/avro.rs | 13 ++++++---
.../core/src/datasource/physical_plan/csv.rs | 3 --
.../datasource/physical_plan/file_scan_config.rs | 31 +++++++-------------
.../core/src/datasource/physical_plan/json.rs | 4 ---
.../src/datasource/physical_plan/parquet/source.rs | 3 --
7 files changed, 50 insertions(+), 42 deletions(-)
diff --git a/datafusion/core/src/datasource/data_source.rs
b/datafusion/core/src/datasource/data_source.rs
index d31b68019e..fcb31194ea 100644
--- a/datafusion/core/src/datasource/data_source.rs
+++ b/datafusion/core/src/datasource/data_source.rs
@@ -26,6 +26,8 @@ use crate::datasource::physical_plan::{FileOpener,
FileScanConfig};
use arrow::datatypes::SchemaRef;
use datafusion_common::Statistics;
+use datafusion_datasource::file_groups::FileGroupPartitioner;
+use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
@@ -62,9 +64,33 @@ pub trait FileSource: Send + Sync {
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) ->
fmt::Result {
Ok(())
}
- /// Return true if the file format supports repartition
+
+ /// If supported by the [`FileSource`], redistribute files across
partitions according to their size.
+ /// Allows custom file formats to implement their own repartitioning logic.
///
- /// If this returns true, the DataSourceExec may repartition the data
- /// by breaking up the input files into multiple smaller groups.
- fn supports_repartition(&self, config: &FileScanConfig) -> bool;
+ /// Provides a default repartitioning behavior, see comments on
[`FileGroupPartitioner`] for more detail.
+ fn repartitioned(
+ &self,
+ target_partitions: usize,
+ repartition_file_min_size: usize,
+ output_ordering: Option<LexOrdering>,
+ config: &FileScanConfig,
+ ) -> datafusion_common::Result<Option<FileScanConfig>> {
+ if config.file_compression_type.is_compressed() ||
config.new_lines_in_values {
+ return Ok(None);
+ }
+
+ let repartitioned_file_groups_option = FileGroupPartitioner::new()
+ .with_target_partitions(target_partitions)
+ .with_repartition_file_min_size(repartition_file_min_size)
+ .with_preserve_order_within_groups(output_ordering.is_some())
+ .repartition_file_groups(&config.file_groups);
+
+ if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
+ let mut source = config.clone();
+ source.file_groups = repartitioned_file_groups;
+ return Ok(Some(source));
+ }
+ Ok(None)
+ }
}
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index c6e05893a9..d0d0379248 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -256,10 +256,6 @@ impl FileSource for ArrowSource {
fn file_type(&self) -> &str {
"arrow"
}
-
- fn supports_repartition(&self, config: &FileScanConfig) -> bool {
- !(config.file_compression_type.is_compressed() ||
config.new_lines_in_values)
- }
}
/// The struct arrow that implements `[FileOpener]` trait
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index 1a88dc31a6..ae98c19a16 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -255,10 +255,15 @@ impl FileSource for AvroSource {
fn file_type(&self) -> &str {
"avro"
}
- fn supports_repartition(&self, config: &FileScanConfig) -> bool {
- !(config.file_compression_type.is_compressed()
- || config.new_lines_in_values
- || self.as_any().downcast_ref::<AvroSource>().is_some())
+
+ fn repartitioned(
+ &self,
+ _target_partitions: usize,
+ _repartition_file_min_size: usize,
+ _output_ordering: Option<LexOrdering>,
+ _config: &FileScanConfig,
+ ) -> Result<Option<FileScanConfig>> {
+ Ok(None)
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 412c90726a..8fcfd6b41e 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -618,9 +618,6 @@ impl FileSource for CsvSource {
fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
write!(f, ", has_header={}", self.has_header)
}
- fn supports_repartition(&self, config: &FileScanConfig) -> bool {
- !(config.file_compression_type.is_compressed() ||
config.new_lines_in_values)
- }
}
impl FileOpener for CsvOpener {
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 4996b6d97b..5c882ed751 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -19,8 +19,8 @@
//! file sources.
use super::{
- get_projected_output_ordering, statistics::MinMaxStatistics,
FileGroupPartitioner,
- FileGroupsDisplay, FileStream,
+ get_projected_output_ordering, statistics::MinMaxStatistics,
FileGroupsDisplay,
+ FileStream,
};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::{listing::PartitionedFile,
object_store::ObjectStoreUrl};
@@ -203,30 +203,21 @@ impl DataSource for FileScanConfig {
self.fmt_file_source(t, f)
}
- /// Redistribute files across partitions according to their size
- /// See comments on [`FileGroupPartitioner`] for more detail.
+ /// If supported by the underlying [`FileSource`], redistribute files
across partitions according to their size.
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
- if !self.source.supports_repartition(self) {
- return Ok(None);
- }
-
- let repartitioned_file_groups_option = FileGroupPartitioner::new()
- .with_target_partitions(target_partitions)
- .with_repartition_file_min_size(repartition_file_min_size)
- .with_preserve_order_within_groups(output_ordering.is_some())
- .repartition_file_groups(&self.file_groups);
-
- if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
- let mut source = self.clone();
- source.file_groups = repartitioned_file_groups;
- return Ok(Some(Arc::new(source)));
- }
- Ok(None)
+ let source = self.source.repartitioned(
+ target_partitions,
+ repartition_file_min_size,
+ output_ordering,
+ self,
+ )?;
+
+ Ok(source.map(|s| Arc::new(s) as _))
}
fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 249f50efa5..f2304ed8a3 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -313,10 +313,6 @@ impl FileSource for JsonSource {
fn file_type(&self) -> &str {
"json"
}
-
- fn supports_repartition(&self, config: &FileScanConfig) -> bool {
- !(config.file_compression_type.is_compressed() ||
config.new_lines_in_values)
- }
}
impl FileOpener for JsonOpener {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs
b/datafusion/core/src/datasource/physical_plan/parquet/source.rs
index 810a16de41..26a5877e2d 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs
@@ -586,7 +586,4 @@ impl FileSource for ParquetSource {
}
}
}
- fn supports_repartition(&self, _config: &FileScanConfig) -> bool {
- true
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]