alamb commented on code in PR #5057:
URL: https://github.com/apache/arrow-datafusion/pull/5057#discussion_r1089724193
##########
datafusion/common/src/config.rs:
##########
@@ -261,10 +261,17 @@ config_namespace! {
/// in parallel using the provided `target_partitions` level"
pub repartition_aggregations: bool, default = true
+ /// Minimum total files size in bytes to perform file scan
repartitioning.
+ pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
+
/// Should DataFusion repartition data using the join keys to execute
joins in parallel
/// using the provided `target_partitions` level"
pub repartition_joins: bool, default = true
+ /// When set to true, file groups will be repartitioned to achieve
maximum parallelism.
+ /// Currently supported only for Parquet format
Review Comment:
```suggestion
/// When set to true, file groups will be repartitioned to achieve
maximum parallelism.
/// Currently supported only for Parquet format in which case
/// multiple row groups from the same file may be read concurrently.
If false then each
/// row group is read serially, though different files may be read
in parallel.
```
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -846,6 +897,182 @@ mod tests {
Ok(())
}
+ #[test]
+ fn parallelization_single_partition() -> Result<()> {
+ let plan = aggregate(parquet_exec());
+
+ let expected = [
+ "AggregateExec: mode=Final, gby=[], aggr=[]",
+ "CoalescePartitionsExec",
+ "AggregateExec: mode=Partial, gby=[], aggr=[]",
+ "ParquetExec: limit=None, partitions={2 groups: [[x:0..50],
[x:50..100]]}, projection=[c1]",
+ ];
+
+ assert_optimized!(expected, plan, 2, true, 10);
+ Ok(())
+ }
+
+ #[test]
+ fn parallelization_two_partitions() -> Result<()> {
+ let plan = aggregate(parquet_exec_two_partitions());
+
+ let expected = [
+ "AggregateExec: mode=Final, gby=[], aggr=[]",
+ "CoalescePartitionsExec",
+ "AggregateExec: mode=Partial, gby=[], aggr=[]",
+ // Plan already has two partitions
+ "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]},
projection=[c1]",
+ ];
+
+ assert_optimized!(expected, plan, 2, true, 10);
+ Ok(())
+ }
Review Comment:
I suggest one more case that shows a plan bring repartitioned even if it has
multiple files
For example, `parquet_exec_two_partitions` with `target_partitions = 4`
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -448,10 +493,16 @@ mod tests {
/// Runs the repartition optimizer and asserts the plan against the
expected
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
+ assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024);
+ };
+
+ ($EXPECTED_LINES: expr, $PLAN: expr, $TAGRET_PARTITIONS: expr,
$REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
let mut config = ConfigOptions::new();
- config.execution.target_partitions = 10;
+ config.execution.target_partitions = $TAGRET_PARTITIONS;
Review Comment:
```suggestion
config.execution.target_partitions = $TARGET_PARTITIONS;
```
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -448,10 +493,16 @@ mod tests {
/// Runs the repartition optimizer and asserts the plan against the
expected
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
+ assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024);
+ };
+
+ ($EXPECTED_LINES: expr, $PLAN: expr, $TAGRET_PARTITIONS: expr,
$REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
Review Comment:
```suggestion
($EXPECTED_LINES: expr, $PLAN: expr, $TARGET_PARTITIONS: expr,
$REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]