alamb commented on code in PR #4714:
URL: https://github.com/apache/arrow-datafusion/pull/4714#discussion_r1057241314
##########
datafusion/core/src/config.rs:
##########
@@ -133,6 +133,10 @@ pub const OPT_PREFER_HASH_JOIN: &str =
"datafusion.optimizer.prefer_hash_join";
pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
"datafusion.optimizer.hash_join_single_partition_threshold";
+/// Configuration option "datafusion.execution.round_robin_repartition"
Review Comment:
👍
##########
datafusion/core/src/physical_optimizer/coalesce_batches.rs:
##########
@@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://github.com/apache/arrow-datafusion/issues/139
let wrap_in_coalesce =
plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
- || plan_any.downcast_ref::<RepartitionExec>().is_some();
+ // Don't need to add CoalesceBatchesExec after a round robin
RepartitionExec
Review Comment:
👍 makes sense to me -- to confirm the rationale for this change is that
RoundRobinRepartition simply shuffles batches around to different partitions,
but doesn't actually change (grow or shrink) the actual RecordBatches
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -161,34 +161,14 @@ fn optimize_partitions(
// leaf node - don't replace children
plan
} else {
- let can_reorder_children =
- match (plan.relies_on_input_order(), plan.maintains_input_order())
{
Review Comment:
I believe this is the last use of `maintains_input_order` -- so after this
PR is merged, I will try and create a PR that removes this from the trait -- it
is redundant with the `plan.output_order` I think
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1633,7 +1633,10 @@ async fn test_window_frame_nth_value_aggregate() ->
Result<()> {
#[tokio::test]
async fn test_window_agg_sort() -> Result<()> {
- let ctx = SessionContext::new();
+ // We need to specify the target partition number.
+ // Otherwise, the default value used may vary on different environment
+ // with different cpu core number, which may cause the UT failure.
Review Comment:
👍
##########
datafusion/core/src/physical_optimizer/global_sort_selection.rs:
##########
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Select the efficient global sort implementation based on sort details.
+
+use std::sync::Arc;
+
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::ExecutionPlan;
+use crate::prelude::SessionConfig;
+
+/// Currently for a sort operator, if
+/// - there are more than one input partitions
+/// - and there's some limit which can be pushed down to each of its input
partitions
+/// then [SortPreservingMergeExec] with local sort with a limit pushed down
will be preferred;
+/// Otherwise, the normal global sort [SortExec] will be used.
+/// Later more intelligent statistics-based decision can also be introduced.
Review Comment:
See also https://github.com/apache/arrow-datafusion/pull/4691
##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2201,8 +2201,8 @@ async fn right_semi_join() -> Result<()> {
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name, t1_int@2 as t1_int]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
Review Comment:
these changes seem ok to me (no better or worse)
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -546,12 +538,12 @@ mod tests {
}
#[test]
- fn repartition_ignores_sort_preserving_merge() -> Result<()> {
+ fn repartition_with_preserving_merge() -> Result<()> {
let plan = sort_preserving_merge_exec(parquet_exec());
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
- // Expect no repartition of SortPreservingMergeExec
+ "RepartitionExec: partitioning=RoundRobinBatch(10)",
Review Comment:
I am pretty sure these tests were added explicitly to avoid the case
@Dandandan is referring to.
Given that they tests are explicitly for "not putting the repartition exec"
before a SortPreservingMerge I don't think we should simply change the tests.
If the issue is that the unit test doesn't accurately reflect the output of
the whole optimizer process with all the rules, perhaps we can create a test
that runs the whole physical optimizer and move the "no repartition before the
sort preserving merge test" ther?
Here is an example of such a test for the logical optimizer;
https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/tests/integration-test.rs
I can try to help with this test framework, if it would help
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1649,9 +1652,10 @@ async fn test_window_agg_sort() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
Review Comment:
These Repartition's don't seem particularly useful to me (they repartition
right before a projection) but I also don't think they hurt either
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]