Dandandan commented on code in PR #8731:
URL: https://github.com/apache/arrow-datafusion/pull/8731#discussion_r1440854228


##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1198,37 +1191,25 @@ fn ensure_distribution(
     )
     .map(
         |(mut child, requirement, required_input_ordering, would_benefit, 
maintains)| {
-            // Don't need to apply when the returned row count is not greater 
than 1:
+            // Don't need to apply when the returned row count is not greater 
than batch size
             let num_rows = child.plan.statistics()?.num_rows;
             let repartition_beneficial_stats = if 
num_rows.is_exact().unwrap_or(false) {
                 num_rows
                     .get_value()
                     .map(|value| value > &batch_size)
-                    .unwrap_or(true)
+                    .unwrap() // safe to unwrap since is_exact() is true
             } else {
                 true
             };
 
-            if enable_round_robin
-                // Operator benefits from partitioning (e.g. filter):
-                && (would_benefit && repartition_beneficial_stats)
-                // Unless partitioning doesn't increase the partition count, 
it is not beneficial:
-                && child.plan.output_partitioning().partition_count() < 
target_partitions
-            {
-                // When `repartition_file_scans` is set, attempt to increase
-                // parallelism at the source.
-                if repartition_file_scans {
-                    if let Some(new_child) =
-                        child.plan.repartitioned(target_partitions, config)?
-                    {
-                        child.plan = new_child;
-                    }
+            // When `repartition_file_scans` is set, attempt to increase
+            // parallelism at the source.
+            if repartition_file_scans {
+                if let Some(new_child) =
+                    child.plan.repartitioned(target_partitions, config)?
+                {
+                    child.plan = new_child;
                 }
-                // Increase parallelism by adding round-robin repartitioning
-                // on top of the operator. Note that we only do this if the
-                // partition count is not already equal to the desired 
partition
-                // count.
-                child = add_roundrobin_on_top(child, target_partitions)?;

Review Comment:
   If I remember correctly, before partitions were pushed down to scans, adding 
round robin repartitioning after e.g. single files was better than doing it 
only in hash repartition. I think if you disable the file repartitioning or 
test with formats that don't support it you should be able to measure the 
effect.
   The additional cost of round-robin is *extremely small* (invisible in 
profiles), especially compared to hash repartitioning.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to