NGA-TRAN commented on code in PR #18352:
URL: https://github.com/apache/datafusion/pull/18352#discussion_r2478407869


##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,152 @@ async fn test_count_wildcard_on_window() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
 -> Result<()> {

Review Comment:
   ```suggestion
   // Test with `repartition_sorts` disabled, causing a full resort of the data
   async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
 -> Result<()> {
   ```



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,152 @@ async fn test_count_wildcard_on_window() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
 -> Result<()> {
+    assert_snapshot!(
+        
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
+        @r#"
+    
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    | plan_type     | plan                                                     
                                                                                
                                                                         |
+    
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    | logical_plan  | Aggregate: groupBy=[[id]], aggr=[[]]                     
                                                                                
                                                                         |
+    |               |   Union                                                  
                                                                                
                                                                         |
+    |               |     TableScan: sorted projection=[id]                    
                                                                                
                                                                         |
+    |               |     Sort: unsorted.id ASC NULLS LAST                     
                                                                                
                                                                         |
+    |               |       TableScan: unsorted projection=[id]                
                                                                                
                                                                         |
+    | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted                                                            
                                                                            |
+    |               |   SortExec: expr=[id@0 ASC NULLS LAST], 
preserve_partitioning=[false]                                                   
                                                                                
          |
+    |               |     CoalescePartitionsExec                               
                                                                                
                                                                         |
+    |               |       AggregateExec: mode=Partial, gby=[id@0 as id], 
aggr=[]                                                                         
                                                                             |
+    |               |         UnionExec                                        
                                                                                
                                                                         |
+    |               |           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
+    |               |           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet 
                                       |
+    |               |                                                          
                                                                                
                                                                         |
+    
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    "#);
+    Ok(())
+}
+
+#[ignore] // See https://github.com/apache/datafusion/issues/18380
+#[tokio::test]
+async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
 -> Result<()> {

Review Comment:
   ```suggestion
   // Test with `repartition_sorts` enabled to preserve pre-sorted partitions 
and avoid resorting
   async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
 -> Result<()> {
   ```



##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -359,6 +359,87 @@ async fn test_union_inputs_different_sorted2() -> 
Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
 -> Result<()> {

Review Comment:
   ```suggestion
   // Test with `repartition_sorts` enabled to preserve pre-sorted partitions 
and avoid resorting
   async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
 -> Result<()> {
   ```



##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -359,6 +359,87 @@ async fn test_union_inputs_different_sorted2() -> 
Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
 -> Result<()> {
+    assert_snapshot!(
+        
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?,
+        @r"
+    Input Plan:
+    OutputRequirementExec: order_by=[(nullable_col@0, asc)], 
dist_by=SinglePartition
+      CoalescePartitionsExec
+        UnionExec
+          SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
+            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], file_type=parquet
+          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC], file_type=parquet
+
+    Optimized Plan:
+    OutputRequirementExec: order_by=[(nullable_col@0, asc)], 
dist_by=SinglePartition
+      SortPreservingMergeExec: [nullable_col@0 ASC]
+        UnionExec
+          SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
+            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], file_type=parquet
+          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC], file_type=parquet
+    ");
+    Ok(())
+}
+
+#[tokio::test]
+async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
 -> Result<()> {

Review Comment:
   ```suggestion
   // Test with `repartition_sorts` disabled, causing a full resort of the data
   async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
 -> Result<()> {
   ```



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,152 @@ async fn test_count_wildcard_on_window() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
 -> Result<()> {
+    assert_snapshot!(
+        
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
+        @r#"
+    
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    | plan_type     | plan                                                     
                                                                                
                                                                         |
+    
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    | logical_plan  | Aggregate: groupBy=[[id]], aggr=[[]]                     
                                                                                
                                                                         |
+    |               |   Union                                                  
                                                                                
                                                                         |
+    |               |     TableScan: sorted projection=[id]                    
                                                                                
                                                                         |
+    |               |     Sort: unsorted.id ASC NULLS LAST                     
                                                                                
                                                                         |
+    |               |       TableScan: unsorted projection=[id]                
                                                                                
                                                                         |
+    | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted                                                            
                                                                            |
+    |               |   SortExec: expr=[id@0 ASC NULLS LAST], 
preserve_partitioning=[false]                                                   
                                                                                
          |
+    |               |     CoalescePartitionsExec                               
                                                                                
                                                                         |
+    |               |       AggregateExec: mode=Partial, gby=[id@0 as id], 
aggr=[]                                                                         
                                                                             |
+    |               |         UnionExec                                        
                                                                                
                                                                         |
+    |               |           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
+    |               |           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet 
                                       |
+    |               |                                                          
                                                                                
                                                                         |
+    
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    "#);
+    Ok(())
+}
+
+#[ignore] // See https://github.com/apache/datafusion/issues/18380
+#[tokio::test]
+async fn 
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
 -> Result<()> {
+    assert_snapshot!(
+        
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?,
+        @r#"
+    
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    | plan_type     | plan                                                     
                                                                                
                                                                       |
+    
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    | logical_plan  | Aggregate: groupBy=[[id]], aggr=[[]]                     
                                                                                
                                                                       |
+    |               |   Union                                                  
                                                                                
                                                                       |
+    |               |     TableScan: sorted projection=[id]                    
                                                                                
                                                                       |
+    |               |     Sort: unsorted.id ASC NULLS LAST                     
                                                                                
                                                                       |
+    |               |       TableScan: unsorted projection=[id]                
                                                                                
                                                                       |
+    | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted                                                            
                                                                          |
+    |               |   SortPreservingMergeExec: [id@0 ASC NULLS LAST]         
                                                                                
                                                                       |
+    |               |     AggregateExec: mode=Partial, gby=[id@0 as id], 
aggr=[], ordering_mode=Sorted                                                   
                                                                             |
+    |               |       UnionExec                                          
                                                                                
                                                                       |
+    |               |         DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
+    |               |         SortExec: expr=[id@0 ASC NULLS LAST], 
preserve_partitioning=[false]                                                   
                                                                                
  |
+    |               |           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet 
                                     |
+    |               |                                                          
                                                                                
                                                                       |
+    
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+    "#);
+
+    // đź’Ą Doesn't pass, and generates this plan:
+    //
+    // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted
+    //   SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+    //     SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+    //       AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+    //         UnionExec
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+    //
+    //
+    // === Excerpt from the verbose explain ===
+    //
+    // Physical_plan after EnforceDistribution:
+    //
+    // OutputRequirementExec: order_by=[], dist_by=Unspecified
+    //   AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted
+    //     SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+    //       CoalescePartitionsExec
+    //         AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted
+    //           UnionExec
+    //             DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+    //             SortExec: expr=[id@0 ASC NULLS LAST], 
preserve_partitioning=[false]
+    //               DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+    //
+    // Physical_plan after EnforceSorting:

Review Comment:
   You might want to keep the exact output, including the no-op optimizer step 
in between—it helps others see exactly what’s happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to