adriangb commented on code in PR #19446:
URL: https://github.com/apache/datafusion/pull/19446#discussion_r2648264542


##########
datafusion/core/tests/physical_optimizer/pushdown_sort.rs:
##########
@@ -670,3 +669,272 @@ fn test_pushdown_through_blocking_node() {
     "
     );
 }
+
+// ============================================================================
+// PROJECTION TESTS
+// ============================================================================
+
+#[test]
+fn test_sort_pushdown_through_simple_projection() {
+    // Sort pushes through projection with simple column references
+    let schema = schema();
+
+    // Source has [a ASC] ordering
+    let a = sort_expr("a", &schema);
+    let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+    let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+    // Projection: SELECT a, b (simple column references)
+    let projection = simple_projection_exec(source, vec![0, 1]); // columns a, 
b
+
+    // Request [a DESC] - should push through projection to source
+    let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+    let plan = sort_exec(desc_ordering, projection);
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownSort::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+        -   ProjectionExec: expr=[a@0 as a, b@1 as b]
+        -     DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+      output:
+        Ok:
+          - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+          -   ProjectionExec: expr=[a@0 as a, b@1 as b]
+          -     DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet, reverse_row_groups=true
+    "
+    );
+}
+
+#[test]
+fn test_sort_pushdown_through_projection_with_alias() {
+    // Sort pushes through projection with column aliases
+    let schema = schema();
+
+    // Source has [a ASC] ordering
+    let a = sort_expr("a", &schema);
+    let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+    let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+    // Projection: SELECT a AS id, b AS value
+    let projection = projection_exec_with_alias(source, vec![(0, "id"), (1, 
"value")]);
+
+    // Request [id DESC] - should map to [a DESC] and push down
+    let id_expr = sort_expr_named("id", 0);
+    let desc_ordering = LexOrdering::new(vec![id_expr.reverse()]).unwrap();
+    let plan = sort_exec(desc_ordering, projection);
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownSort::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - SortExec: expr=[id@0 DESC NULLS LAST], preserve_partitioning=[false]
+        -   ProjectionExec: expr=[a@0 as id, b@1 as value]
+        -     DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+      output:
+        Ok:
+          - SortExec: expr=[id@0 DESC NULLS LAST], 
preserve_partitioning=[false]
+          -   ProjectionExec: expr=[a@0 as id, b@1 as value]
+          -     DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet, reverse_row_groups=true
+    "
+    );
+}
+
+#[test]
+fn test_no_sort_pushdown_through_computed_projection() {
+    use datafusion_expr::Operator;
+
+    // Sort should NOT push through projection with computed columns
+    let schema = schema();
+
+    // Source has [a ASC] ordering
+    let a = sort_expr("a", &schema);
+    let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+    let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+    // Projection: SELECT a+b as sum, c
+    let projection = projection_exec(
+        vec![
+            (
+                Arc::new(expressions::BinaryExpr::new(
+                    Arc::new(expressions::Column::new("a", 0)),
+                    Operator::Plus,
+                    Arc::new(expressions::Column::new("b", 1)),
+                )) as Arc<dyn PhysicalExpr>,
+                "sum".to_string(),
+            ),
+            (
+                Arc::new(expressions::Column::new("c", 2)) as Arc<dyn 
PhysicalExpr>,
+                "c".to_string(),
+            ),
+        ],
+        source,
+    )
+        .unwrap();
+
+    // Request [sum DESC] - should NOT push down (sum is computed)
+    let sum_expr = sort_expr_named("sum", 0);
+    let desc_ordering = LexOrdering::new(vec![sum_expr.reverse()]).unwrap();
+    let plan = sort_exec(desc_ordering, projection);
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownSort::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - SortExec: expr=[sum@0 DESC NULLS LAST], preserve_partitioning=[false]
+        -   ProjectionExec: expr=[a@0 + b@1 as sum, c@2 as c]
+        -     DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+      output:
+        Ok:
+          - SortExec: expr=[sum@0 DESC NULLS LAST], 
preserve_partitioning=[false]
+          -   ProjectionExec: expr=[a@0 + b@1 as sum, c@2 as c]
+          -     DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+    "
+    );
+}
+
+#[test]
+fn test_sort_pushdown_projection_reordered_columns() {
+    // Sort pushes through projection that reorders columns
+    let schema = schema();
+
+    // Source has [a ASC] ordering
+    let a = sort_expr("a", &schema);
+    let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+    let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+    // Projection: SELECT c, b, a (columns reordered)
+    let projection = simple_projection_exec(source, vec![2, 1, 0]); // c, b, a
+
+    // Request [a DESC] where a is now at index 2 in projection output
+    let a_expr_at_2 = sort_expr_named("a", 2);
+    let desc_ordering = LexOrdering::new(vec![a_expr_at_2.reverse()]).unwrap();
+    let plan = sort_exec(desc_ordering, projection);
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownSort::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - SortExec: expr=[a@2 DESC NULLS LAST], preserve_partitioning=[false]
+        -   ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
+        -     DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+      output:
+        Ok:
+          - SortExec: expr=[a@2 DESC NULLS LAST], preserve_partitioning=[false]
+          -   ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
+          -     DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet, reverse_row_groups=true

Review Comment:
   Nit: for these tests it would be nice to have a TestScan type thing that 
accepts any order just to see it piped down all the way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to