alamb commented on code in PR #7811:
URL: https://github.com/apache/arrow-datafusion/pull/7811#discussion_r1357191766


##########
datafusion/core/src/physical_optimizer/enforce_sorting.rs:
##########
@@ -2234,4 +2246,36 @@ mod tests {
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
false);
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_window_multi_layer_requirement() -> Result<()> {
+        let schema = create_test_schema3()?;
+        let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b", 
&schema)];
+        let source = csv_exec_sorted(&schema, vec![], false);
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let repartition = repartition_exec(sort);
+        let repartition = spr_repartition_exec(repartition);
+        let spm = sort_preserving_merge_exec(sort_exprs.clone(), repartition);
+
+        let physical_plan = bounded_window_exec("a", sort_exprs, spm);
+
+        let expected_input = [
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }], mode=[Sorted]",
+            "  SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
+            "    SortPreservingRepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        SortExec: expr=[a@0 ASC,b@1 ASC]",
+            "          CsvExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], has_header=false",
+        ];
+        let expected_optimized = [

Review Comment:
   Without the code in this PR, I verified this test failed. The plan showed 
the same pattern as described in 
https://github.com/apache/arrow-datafusion/issues/7794
   
   ```
       "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }], mode=[Sorted]",
       "  SortPreservingMergeExec: []",
       "    SortExec: expr=[a@0 ASC,b@1 ASC]",
       "      SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
       "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
       "          CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], has_header=false",
   ```



##########
datafusion/core/src/physical_optimizer/enforce_sorting.rs:
##########
@@ -1635,14 +1645,16 @@ mod tests {
         // During the removal of `SortExec`s, it should be able to remove the
         // corresponding SortExecs together. Also, the inputs of these 
`SortExec`s
         // are not necessarily the same to be able to remove them.
-        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
+        let expected_input = [

Review Comment:
   The diff made it hard to see -- but I am pretty sure this test did not 
change, just the formatting did. Is that correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to