viirya commented on code in PR #8731:
URL: https://github.com/apache/arrow-datafusion/pull/8731#discussion_r1440728101


##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1198,37 +1191,25 @@ fn ensure_distribution(
     )
     .map(
         |(mut child, requirement, required_input_ordering, would_benefit, 
maintains)| {
-            // Don't need to apply when the returned row count is not greater 
than 1:
+            // Don't need to apply when the returned row count is not greater 
than batch size
             let num_rows = child.plan.statistics()?.num_rows;
             let repartition_beneficial_stats = if 
num_rows.is_exact().unwrap_or(false) {
                 num_rows
                     .get_value()
                     .map(|value| value > &batch_size)
-                    .unwrap_or(true)
+                    .unwrap() // safe to unwrap since is_exact() is true
             } else {
                 true
             };
 
-            if enable_round_robin
-                // Operator benefits from partitioning (e.g. filter):
-                && (would_benefit && repartition_beneficial_stats)
-                // Unless partitioning doesn't increase the partition count, 
it is not beneficial:
-                && child.plan.output_partitioning().partition_count() < 
target_partitions
-            {
-                // When `repartition_file_scans` is set, attempt to increase
-                // parallelism at the source.
-                if repartition_file_scans {
-                    if let Some(new_child) =
-                        child.plan.repartitioned(target_partitions, config)?
-                    {
-                        child.plan = new_child;
-                    }
+            // When `repartition_file_scans` is set, attempt to increase
+            // parallelism at the source.
+            if repartition_file_scans {
+                if let Some(new_child) =
+                    child.plan.repartitioned(target_partitions, config)?
+                {
+                    child.plan = new_child;
                 }
-                // Increase parallelism by adding round-robin repartitioning
-                // on top of the operator. Note that we only do this if the
-                // partition count is not already equal to the desired 
partition
-                // count.
-                child = add_roundrobin_on_top(child, target_partitions)?;

Review Comment:
   I'm not sure as these benchmarks don't check query plans. I may need to 
manually check it. I will post back what I see later. However, my first idea 
might be, doesn't it mean mostly we don't need to add these round robin? I'm 
not sure if the addition of the round robin is driven by seeing better 
performance number.
   
   Btw, I know this change might be a bit controversial before I change it. I 
change it because the query plan looks weird to have two-level repartitions 
always (round robin + hashing) and some of them looks unreasonable (round robin 
9000 partitions?) so want to raise the discussion. If I cannot get consensus on 
the removal, I can restore it for sure and keep other change in this PR only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to