This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fdb5454930 Move source repartitioning into
`ExecutionPlan::repartition` (#7936)
fdb5454930 is described below
commit fdb54549301ccbede0b48788ff0fa057dd799d63
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Oct 30 13:17:04 2023 -0400
Move source repartitioning into `ExecutionPlan::repartition` (#7936)
* Move source repartitioning into ExecutionPlan::repartition
* cleanup
* update test
* update test
* refine docs
* fix merge
---
.../core/src/datasource/physical_plan/csv.rs | 58 ++++++-------
.../core/src/datasource/physical_plan/mod.rs | 96 ++++++++++------------
.../core/src/datasource/physical_plan/parquet.rs | 41 ++++-----
.../src/physical_optimizer/enforce_distribution.rs | 28 ++-----
datafusion/physical-plan/src/lib.rs | 30 ++++++-
5 files changed, 130 insertions(+), 123 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 8117e101ea..82163da64a 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -46,6 +46,7 @@ use datafusion_physical_expr::{
};
use bytes::{Buf, Bytes};
+use datafusion_common::config::ConfigOptions;
use futures::{ready, StreamExt, TryStreamExt};
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
@@ -117,34 +118,6 @@ impl CsvExec {
pub fn escape(&self) -> Option<u8> {
self.escape
}
-
- /// Redistribute files across partitions according to their size
- /// See comments on `repartition_file_groups()` for more detail.
- ///
- /// Return `None` if can't get repartitioned(empty/compressed file).
- pub fn get_repartitioned(
- &self,
- target_partitions: usize,
- repartition_file_min_size: usize,
- ) -> Option<Self> {
- // Parallel execution on compressed CSV file is not supported yet.
- if self.file_compression_type.is_compressed() {
- return None;
- }
-
- let repartitioned_file_groups_option =
FileScanConfig::repartition_file_groups(
- self.base_config.file_groups.clone(),
- target_partitions,
- repartition_file_min_size,
- );
-
- if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
- let mut new_plan = self.clone();
- new_plan.base_config.file_groups = repartitioned_file_groups;
- return Some(new_plan);
- }
- None
- }
}
impl DisplayAs for CsvExec {
@@ -205,6 +178,35 @@ impl ExecutionPlan for CsvExec {
Ok(self)
}
+ /// Redistribute files across partitions according to their size
+ /// See comments on `repartition_file_groups()` for more detail.
+ ///
+ /// Return `None` if can't get repartitioned(empty/compressed file).
+ fn repartitioned(
+ &self,
+ target_partitions: usize,
+ config: &ConfigOptions,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let repartition_file_min_size =
config.optimizer.repartition_file_min_size;
+ // Parallel execution on compressed CSV file is not supported yet.
+ if self.file_compression_type.is_compressed() {
+ return Ok(None);
+ }
+
+ let repartitioned_file_groups_option =
FileScanConfig::repartition_file_groups(
+ self.base_config.file_groups.clone(),
+ target_partitions,
+ repartition_file_min_size,
+ );
+
+ if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
+ let mut new_plan = self.clone();
+ new_plan.base_config.file_groups = repartitioned_file_groups;
+ return Ok(Some(Arc::new(new_plan)));
+ }
+ Ok(None)
+ }
+
fn execute(
&self,
partition: usize,
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 6643e4127d..ea0a9698ff 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -527,6 +527,7 @@ mod tests {
};
use arrow_schema::Field;
use chrono::Utc;
+ use datafusion_common::config::ConfigOptions;
use crate::physical_plan::{DefaultDisplay, VerboseDisplay};
@@ -828,11 +829,7 @@ mod tests {
None,
);
- let partitioned_file = parquet_exec
- .get_repartitioned(4, 0)
- .base_config()
- .file_groups
- .clone();
+ let partitioned_file = repartition_with_size(&parquet_exec, 4, 0);
assert!(partitioned_file[0][0].range.is_none());
}
@@ -893,13 +890,8 @@ mod tests {
None,
);
- let actual = file_groups_to_vec(
- parquet_exec
- .get_repartitioned(n_partition, 10)
- .base_config()
- .file_groups
- .clone(),
- );
+ let actual =
+ repartition_with_size_to_vec(&parquet_exec,
n_partition, 10);
assert_eq!(expected, &actual);
}
@@ -927,13 +919,7 @@ mod tests {
None,
);
- let actual = file_groups_to_vec(
- parquet_exec
- .get_repartitioned(4, 10)
- .base_config()
- .file_groups
- .clone(),
- );
+ let actual = repartition_with_size_to_vec(&parquet_exec, 4, 10);
let expected = vec![
(0, "a".to_string(), 0, 31),
(1, "a".to_string(), 31, 62),
@@ -964,13 +950,7 @@ mod tests {
None,
);
- let actual = file_groups_to_vec(
- parquet_exec
- .get_repartitioned(96, 5)
- .base_config()
- .file_groups
- .clone(),
- );
+ let actual = repartition_with_size_to_vec(&parquet_exec, 96, 5);
let expected = vec![
(0, "a".to_string(), 0, 1),
(1, "a".to_string(), 1, 2),
@@ -1007,13 +987,7 @@ mod tests {
None,
);
- let actual = file_groups_to_vec(
- parquet_exec
- .get_repartitioned(3, 10)
- .base_config()
- .file_groups
- .clone(),
- );
+ let actual = repartition_with_size_to_vec(&parquet_exec, 3, 10);
let expected = vec![
(0, "a".to_string(), 0, 34),
(1, "a".to_string(), 34, 40),
@@ -1046,13 +1020,7 @@ mod tests {
None,
);
- let actual = file_groups_to_vec(
- parquet_exec
- .get_repartitioned(2, 10)
- .base_config()
- .file_groups
- .clone(),
- );
+ let actual = repartition_with_size_to_vec(&parquet_exec, 2, 10);
let expected = vec![
(0, "a".to_string(), 0, 40),
(0, "b".to_string(), 0, 10),
@@ -1086,11 +1054,7 @@ mod tests {
None,
);
- let actual = parquet_exec
- .get_repartitioned(65, 10)
- .base_config()
- .file_groups
- .clone();
+ let actual = repartition_with_size(&parquet_exec, 65, 10);
assert_eq!(2, actual.len());
}
@@ -1115,17 +1079,47 @@ mod tests {
None,
);
- let actual = parquet_exec
- .get_repartitioned(65, 500)
+ let actual = repartition_with_size(&parquet_exec, 65, 500);
+ assert_eq!(1, actual.len());
+ }
+
+ /// Calls `ParquetExec.repartitioned` with the specified
+ /// `target_partitions` and `repartition_file_min_size`, returning the
+ /// resulting `PartitionedFile`s
+ fn repartition_with_size(
+ parquet_exec: &ParquetExec,
+ target_partitions: usize,
+ repartition_file_min_size: usize,
+ ) -> Vec<Vec<PartitionedFile>> {
+ let mut config = ConfigOptions::new();
+ config.optimizer.repartition_file_min_size =
repartition_file_min_size;
+
+ parquet_exec
+ .repartitioned(target_partitions, &config)
+ .unwrap() // unwrap Result
+ .unwrap() // unwrap Option
+ .as_any()
+ .downcast_ref::<ParquetExec>()
+ .unwrap()
.base_config()
.file_groups
- .clone();
- assert_eq!(1, actual.len());
+ .clone()
}
- fn file_groups_to_vec(
- file_groups: Vec<Vec<PartitionedFile>>,
+ /// Calls `repartition_with_size` and returns a tuple for each output
`PartitionedFile`:
+ ///
+ /// `(partition index, file path, start, end)`
+ fn repartition_with_size_to_vec(
+ parquet_exec: &ParquetExec,
+ target_partitions: usize,
+ repartition_file_min_size: usize,
) -> Vec<(usize, String, i64, i64)> {
+ let file_groups = repartition_with_size(
+ parquet_exec,
+ target_partitions,
+ repartition_file_min_size,
+ );
+
file_groups
.iter()
.enumerate()
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 3a2459bec8..f6e999f602 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -259,26 +259,6 @@ impl ParquetExec {
self.enable_bloom_filter
.unwrap_or(config_options.execution.parquet.bloom_filter_enabled)
}
-
- /// Redistribute files across partitions according to their size
- /// See comments on `get_file_groups_repartitioned()` for more detail.
- pub fn get_repartitioned(
- &self,
- target_partitions: usize,
- repartition_file_min_size: usize,
- ) -> Self {
- let repartitioned_file_groups_option =
FileScanConfig::repartition_file_groups(
- self.base_config.file_groups.clone(),
- target_partitions,
- repartition_file_min_size,
- );
-
- let mut new_plan = self.clone();
- if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
- new_plan.base_config.file_groups = repartitioned_file_groups;
- }
- new_plan
- }
}
impl DisplayAs for ParquetExec {
@@ -349,6 +329,27 @@ impl ExecutionPlan for ParquetExec {
Ok(self)
}
+ /// Redistribute files across partitions according to their size
+ /// See comments on `get_file_groups_repartitioned()` for more detail.
+ fn repartitioned(
+ &self,
+ target_partitions: usize,
+ config: &ConfigOptions,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let repartition_file_min_size =
config.optimizer.repartition_file_min_size;
+ let repartitioned_file_groups_option =
FileScanConfig::repartition_file_groups(
+ self.base_config.file_groups.clone(),
+ target_partitions,
+ repartition_file_min_size,
+ );
+
+ let mut new_plan = self.clone();
+ if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
+ new_plan.base_config.file_groups = repartitioned_file_groups;
+ }
+ Ok(Some(Arc::new(new_plan)))
+ }
+
fn execute(
&self,
partition_index: usize,
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 12df9efbbc..6de39db7d5 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -26,9 +26,6 @@ use std::fmt::Formatter;
use std::sync::Arc;
use crate::config::ConfigOptions;
-use crate::datasource::physical_plan::CsvExec;
-#[cfg(feature = "parquet")]
-use crate::datasource::physical_plan::ParquetExec;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above, get_children_exectrees, get_plan_string,
is_coalesce_partitions,
@@ -1188,7 +1185,6 @@ fn ensure_distribution(
// When `false`, round robin repartition will not be added to increase
parallelism
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
- let repartition_file_min_size = config.optimizer.repartition_file_min_size;
let batch_size = config.execution.batch_size;
let is_unbounded = unbounded_output(&dist_context.plan);
// Use order preserving variants either of the conditions true
@@ -1265,25 +1261,13 @@ fn ensure_distribution(
// Unless partitioning doesn't increase the partition count,
it is not beneficial:
&& child.output_partitioning().partition_count() <
target_partitions
{
- // When `repartition_file_scans` is set, leverage source
operators
- // (`ParquetExec`, `CsvExec` etc.) to increase parallelism at
the source.
+ // When `repartition_file_scans` is set, attempt to increase
+ // parallelism at the source.
if repartition_file_scans {
- #[cfg(feature = "parquet")]
- if let Some(parquet_exec) =
- child.as_any().downcast_ref::<ParquetExec>()
+ if let Some(new_child) =
+ child.repartitioned(target_partitions, config)?
{
- child = Arc::new(parquet_exec.get_repartitioned(
- target_partitions,
- repartition_file_min_size,
- ));
- }
- if let Some(csv_exec) =
child.as_any().downcast_ref::<CsvExec>() {
- if let Some(csv_exec) = csv_exec.get_repartitioned(
- target_partitions,
- repartition_file_min_size,
- ) {
- child = Arc::new(csv_exec);
- }
+ child = new_child;
}
}
// Increase parallelism by adding round-robin repartitioning
@@ -1644,8 +1628,8 @@ mod tests {
use
crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
- use crate::datasource::physical_plan::FileScanConfig;
use crate::datasource::physical_plan::ParquetExec;
+ use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_plan::aggregates::{
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index b2f81579f8..3ada2fa163 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -76,6 +76,7 @@ pub use crate::metrics::Metric;
pub use crate::topk::TopK;
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
+use datafusion_common::config::ConfigOptions;
pub use datafusion_common::hash_utils;
pub use datafusion_common::utils::project_schema;
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
@@ -209,7 +210,32 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
- /// creates an iterator
+ /// If supported, attempt to increase the partitioning of this
`ExecutionPlan` to
+ /// produce `target_partitions` partitions.
+ ///
+ /// If the `ExecutionPlan` does not support changing its partitioning,
+ /// returns `Ok(None)` (the default).
+ ///
+ /// It is the `ExecutionPlan` can increase its partitioning, but not to the
+ /// `target_partitions`, it may return an ExecutionPlan with fewer
+ /// partitions. This might happen, for example, if each new partition would
+ /// be too small to be efficiently processed individually.
+ ///
+ /// The DataFusion optimizer attempts to use as many threads as possible by
+ /// repartitioning its inputs to match the target number of threads
+ /// available (`target_partitions`). Some data sources, such as the built
in
+ /// CSV and Parquet readers, implement this method as they are able to read
+ /// from their input files in parallel, regardless of how the source data
is
+ /// split amongst files.
+ fn repartitioned(
+ &self,
+ _target_partitions: usize,
+ _config: &ConfigOptions,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ Ok(None)
+ }
+
+ /// Begin execution of `partition`, returning a stream of
[`RecordBatch`]es.
fn execute(
&self,
partition: usize,
@@ -217,7 +243,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
) -> Result<SendableRecordBatchStream>;
/// Return a snapshot of the set of [`Metric`]s for this
- /// [`ExecutionPlan`].
+ /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
///
/// While the values of the metrics in the returned
/// [`MetricsSet`]s may change as execution progresses, the