Jefffrey commented on code in PR #17367:
URL: https://github.com/apache/datafusion/pull/17367#discussion_r2366180090


##########
datafusion/sql/src/unparser/dialect.rs:
##########
@@ -267,6 +274,14 @@ impl Dialect for DefaultDialect {
 pub struct PostgreSqlDialect {}
 
 impl Dialect for PostgreSqlDialect {
+    fn supports_qualify(&self) -> bool {
+        false
+    }
+
+    fn requires_derived_table_alias(&self) -> bool {
+        true
+    }

Review Comment:
   What's the relation of this fix?



##########
datafusion/sql/src/unparser/rewrite.rs:
##########
@@ -100,6 +100,79 @@ fn rewrite_sort_expr_for_union(exprs: Vec<SortExpr>) -> 
Result<Vec<SortExpr>> {
     Ok(sort_exprs)
 }
 
+/// Rewrite Filter plans that have a Window as their input by inserting a 
SubqueryAlias.
+///
+/// When a Filter directly operates on a Window plan, it can cause issues 
during SQL unparsing
+/// because window functions in a WHERE clause are not valid SQL. The solution 
is to wrap
+/// the Window plan in a SubqueryAlias, effectively creating a derived table.
+///
+/// Example transformation:
+///
+/// Filter: condition
+///   Window: window_function
+///     TableScan: table
+///
+/// becomes:
+///
+/// Filter: condition
+///   SubqueryAlias: __qualify_subquery
+///     Projection: table.column1, table.column2
+///       Window: window_function
+///         TableScan: table
+///
+pub(super) fn rewrite_qualify(plan: &LogicalPlan) -> Result<LogicalPlan> {
+    let plan = plan.clone();
+
+    let transformed_plan = plan.transform_up(|plan| match plan {
+        LogicalPlan::Filter(mut filter) => {
+            // Check if the filter's input is a Window plan
+            if matches!(&*filter.input, LogicalPlan::Window(_)) {

Review Comment:
   ```suggestion
           // Check if the filter's input is a Window plan
           LogicalPlan::Filter(mut filter)
               if matches!(&*filter.input, LogicalPlan::Window(_)) =>
           {
   ```
   
   fyi can collapse the if into a match guard



##########
datafusion/sql/src/unparser/rewrite.rs:
##########
@@ -100,6 +100,79 @@ fn rewrite_sort_expr_for_union(exprs: Vec<SortExpr>) -> 
Result<Vec<SortExpr>> {
     Ok(sort_exprs)
 }
 
+/// Rewrite Filter plans that have a Window as their input by inserting a 
SubqueryAlias.
+///
+/// When a Filter directly operates on a Window plan, it can cause issues 
during SQL unparsing
+/// because window functions in a WHERE clause are not valid SQL. The solution 
is to wrap
+/// the Window plan in a SubqueryAlias, effectively creating a derived table.
+///
+/// Example transformation:
+///
+/// Filter: condition
+///   Window: window_function
+///     TableScan: table
+///
+/// becomes:
+///
+/// Filter: condition
+///   SubqueryAlias: __qualify_subquery
+///     Projection: table.column1, table.column2
+///       Window: window_function
+///         TableScan: table
+///
+pub(super) fn rewrite_qualify(plan: &LogicalPlan) -> Result<LogicalPlan> {
+    let plan = plan.clone();

Review Comment:
   ```suggestion
   pub(super) fn rewrite_qualify(plan: LogicalPlan) -> Result<LogicalPlan> {
   ```
   
   If we clone a reference it's better to make it known in the signature that 
we need ownership anyway (and do clone at callsite if required)



##########
datafusion/sql/tests/cases/plan_to_sql.rs:
##########
@@ -2521,6 +2523,69 @@ fn 
test_unparse_left_semi_join_with_table_scan_projection() -> Result<()> {
     Ok(())
 }
 
+#[test]
+fn test_unparse_window() -> Result<()> {
+    // SubqueryAlias: t
+    // Projection: t.k, t.v, rank() PARTITION BY [t.k] ORDER BY [t.v ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS r
+    //     Filter: rank() PARTITION BY [t.k] ORDER BY [t.v ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = UInt64(1)
+    //     WindowAggr: windowExpr=[[rank() PARTITION BY [t.k] ORDER BY [t.v 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+    //         TableScan: t projection=[k, v]
+
+    let schema = Schema::new(vec![
+        Field::new("k", DataType::Int32, false),
+        Field::new("v", DataType::Int32, false),
+    ]);
+    let window_expr = Expr::WindowFunction(Box::new(WindowFunction {
+        fun: WindowFunctionDefinition::WindowUDF(rank_udwf()),
+        params: WindowFunctionParams {
+            args: vec![],
+            partition_by: vec![col("k")],
+            order_by: vec![col("v").sort(true, true)],
+            window_frame: WindowFrame::new(None),
+            null_treatment: None,
+            distinct: false,
+            filter: None,
+        },
+    }));
+    let table_scan = table_scan(Some("test"), &schema, Some(vec![0, 
1]))?.build()?;
+    let plan = LogicalPlanBuilder::window_plan(table_scan, vec![window_expr])?;
+
+    let name = plan.schema().fields().last().unwrap().name().clone();
+    let plan = LogicalPlanBuilder::from(plan)
+        .filter(col(name.clone()).eq(lit(1i64)))?
+        .project(vec![col("k"), col("v"), col(name)])?
+        .build()?;
+
+    let unparser = Unparser::new(&UnparserPostgreSqlDialect {});
+    let sql = unparser.plan_to_sql(&plan)?;
+    assert_snapshot!(
+        sql,
+        @r#"SELECT "test"."k", "test"."v", "rank() PARTITION BY [test.k] ORDER 
BY [test.v ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING" FROM (SELECT "test"."k" AS "k", "test"."v" AS "v", rank() OVER 
(PARTITION BY "test"."k" ORDER BY "test"."v" ASC NULLS FIRST ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS "rank() PARTITION BY [test.k] 
ORDER BY [test.v ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING" FROM "test") AS "test" WHERE ("rank() PARTITION BY 
[test.k] ORDER BY [test.v ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING" = 1)"#

Review Comment:
   These column names seem really chunky 🤔 
   
   Is it possible to also use the cases provided in the original issue, e.g.
   
   ```sql
   select k, v, r
   from (
       select *, rank() over(partition by k order by v) as r
       from t
   ) t
   where r = 1
   ```
   
   and
   
   ```sql
   select *, rank() over(partition by k order by v) as r
   from t
   qualify r = 1;
   ```
   
   And see if the `where r = 1` gets preserved?



##########
datafusion/sql/src/unparser/rewrite.rs:
##########
@@ -100,6 +100,79 @@ fn rewrite_sort_expr_for_union(exprs: Vec<SortExpr>) -> 
Result<Vec<SortExpr>> {
     Ok(sort_exprs)
 }
 
+/// Rewrite Filter plans that have a Window as their input by inserting a 
SubqueryAlias.
+///
+/// When a Filter directly operates on a Window plan, it can cause issues 
during SQL unparsing
+/// because window functions in a WHERE clause are not valid SQL. The solution 
is to wrap
+/// the Window plan in a SubqueryAlias, effectively creating a derived table.
+///
+/// Example transformation:
+///
+/// Filter: condition
+///   Window: window_function
+///     TableScan: table
+///
+/// becomes:
+///
+/// Filter: condition
+///   SubqueryAlias: __qualify_subquery
+///     Projection: table.column1, table.column2
+///       Window: window_function
+///         TableScan: table
+///
+pub(super) fn rewrite_qualify(plan: &LogicalPlan) -> Result<LogicalPlan> {
+    let plan = plan.clone();
+
+    let transformed_plan = plan.transform_up(|plan| match plan {
+        LogicalPlan::Filter(mut filter) => {
+            // Check if the filter's input is a Window plan
+            if matches!(&*filter.input, LogicalPlan::Window(_)) {
+                // Create a SubqueryAlias around the Window plan
+                let qualifiers = filter
+                    .input
+                    .schema()
+                    .iter()
+                    .filter(|(q, _)| q.is_some())
+                    .flat_map(|(q, _)| q)
+                    .collect::<Vec<_>>();
+
+                let qualifier = if qualifiers.is_empty() {
+                    "__qualify_subquery".to_string()
+                } else {
+                    qualifiers[0].to_string()
+                };

Review Comment:
   What's the significance of this hardcoded `"__qualify_subquery"`? Can it be 
demonstrated in a test?
   
   Btw can avoid collect to Vec by doing something like this:
   
   ```rust
   let qualifier = filter
       .input
       .schema()
       .iter()
       .find_map(|(q, _)| q)
       .map(|q| q.to_string())
       .unwrap_or_else(|| "__qualify_subquery".to_string());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to