NGA-TRAN commented on code in PR #21533:
URL: https://github.com/apache/datafusion/pull/21533#discussion_r3067239500
##########
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt:
##########
@@ -698,6 +698,60 @@ A dev 772.4
B prod 614.4
C prod 2017.6
+##########
+# TEST 13: Partitioned Join where Number of File Groups is less than
target_partitions
+# With preserve_file_partitions enabled, we should still avoid repartitioning
+##########
+
+statement ok
+set datafusion.execution.target_partitions = 4;
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, timestamp,
+ COUNT(*), AVG(value)
+FROM fact_table
+GROUP BY f_dkey, timestamp;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, fact_table.timestamp, count(Int64(1)) AS
count(*), avg(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey, fact_table.timestamp]],
aggr=[[count(Int64(1)), avg(fact_table.value)]]
+03)----TableScan: fact_table projection=[timestamp, value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp,
count(Int64(1))@2 as count(*), avg(fact_table.value)@3 as avg(fact_table.value)]
+02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey,
timestamp@0 as timestamp], aggr=[count(Int64(1)), avg(fact_table.value)]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], file_type=parquet
Review Comment:
Nice. We want to keep these 3 partitions as-is. We do not want to
repartition it to the number of target_partitions
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1328,8 +1328,14 @@ pub fn ensure_distribution(
})
);
- let allow_subset_satisfy_partitioning = current_partitions
+ let allow_subset_satisfy_partitioning = (current_partitions
>= subset_satisfaction_threshold
+ // `preserve_file_partitions` exposes existing file-group
+ // partitioning to the optimizer. Respect it when the only
+ // reason to repartition would be to increase partition count
+ // beyond the preserved file-group count.
+ || (config.optimizer.preserve_file_partitions > 0
+ && current_partitions < target_partitions))
Review Comment:
👍
This condition is needed to preserve file partition even if current
partition < target_partitions
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]