This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 22735bedc497 [SPARK-50772][SQL] Retain table aliases after SET, 
EXTEND, DROP operators
22735bedc497 is described below

commit 22735bedc497e620d3acc9da1b8fc1e70ad02d32
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Thu Jan 9 13:10:43 2025 +0800

    [SPARK-50772][SQL] Retain table aliases after SET, EXTEND, DROP operators
    
    ### What changes were proposed in this pull request?
    
    Per initial feedback from testing, users would like table aliases (such as 
those mapping to left and right side inputs to a prior join) to remain 
available after SET, DROP, and EXTEND operators. Here is an example that should 
work:
    
    ```
    values (0), (1) lhs(a)
    |> inner join values (1), (2) rhs(a) using (a)
    |> extend lhs.a + rhs.a as z1
    |> extend lhs.a - rhs.a as z2
    |> drop z1
    |> where z2 = 0
    |> order by lhs.a, rhs.a, z2
    |> set z2 = 4
    |> limit 2
    |> select lhs.a, rhs.a, z2;
    
    1    1    4
    ```
    
    To implement this:
    * Previously, the `|> where` or `|> order by` operators added a 
`SubqueryAlias` with an auto-generated table alias to the end of the logical 
plan under construction, in order to prevent the analyzer from adding 
attributes to the previous plan later (from 
`ColumnResolutionHelper.resolveExprsAndAddMissingAttrs`).
    * This PR replaces that behavior with a new `PipeOperator` instead, to 
avoid replacing the table alias while maintaining correct behavior.
    * This PR also updates docs to mention the improved table alias behavior.
    
    ### Why are the changes needed?
    
    This makes SQL pipe syntax easier to use.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see above.
    
    ### How was this patch tested?
    
    This PR adds and updates golden file based testing.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49420 from dtenedor/fix-table-aliases.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 docs/sql-pipe-syntax.md                            | 161 ++++++++++++++---
 .../catalyst/analysis/ColumnResolutionHelper.scala |   7 +-
 .../sql/catalyst/expressions/pipeOperators.scala   |  22 +++
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |   1 +
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  15 +-
 .../sql/catalyst/rules/RuleIdCollection.scala      |   1 +
 .../spark/sql/catalyst/trees/TreePatterns.scala    |   1 +
 .../analyzer-results/pipe-operators.sql.out        | 195 +++++++++++++--------
 .../resources/sql-tests/inputs/pipe-operators.sql  |  12 ++
 .../sql-tests/results/pipe-operators.sql.out       |  27 ++-
 10 files changed, 327 insertions(+), 115 deletions(-)

diff --git a/docs/sql-pipe-syntax.md b/docs/sql-pipe-syntax.md
index 829aa0e607bd..3d757db96623 100644
--- a/docs/sql-pipe-syntax.md
+++ b/docs/sql-pipe-syntax.md
@@ -198,12 +198,22 @@ TABLE t;
 
 Evaluates the provided expressions over each of the rows of the input table.   
                                                                                
                                                                 
 
+In general, this operator is not always required with SQL pipe syntax. It is 
possible to use it at
+or near the end of a query to evaluate expressions or specify a list of output 
columns.
+
+Since the final query result always comprises the columns returned from the 
last pipe operator,
+when this `SELECT` operator does not appear, the output includes all columns 
from the full row.
+This behavior is similar to `SELECT *` in standard SQL syntax.
+
 It is possible to use `DISTINCT` and `*` as needed.<br>
 This works like the outermost `SELECT` in a table subquery in regular Spark 
SQL.
 
 Window functions are supported in the `SELECT` list as well. To use them, the 
`OVER` clause must be
 provided. You may provide the window specification in the `WINDOW` clause.
 
+Aggregate functions are not supported in this operator. To perform 
aggregation, use the `AGGREGATE`
+operator instead.
+
 For example:
 
 ```sql
@@ -226,7 +236,12 @@ FROM t
 |> EXTEND <expr> [[AS] alias], ...
 ```
 
-Appends new columns to the input table by evaluating the specified expressions 
over each of the input rows.
+Appends new columns to the input table by evaluating the specified expressions 
over each of the
+input rows.
+
+After an `EXTEND` operation, top-level column names are updated but table 
aliases still refer to the
+original row values (such as an inner join between two tables `lhs` and `rhs` 
with a subsequent
+`EXTEND` and then `SELECT lhs.col, rhs.col`).
 
 For example:
 
@@ -248,7 +263,17 @@ VALUES (0), (1) tab(col)
 |> SET <column> = <expression>, ...
 ```
 
-Updates columns of the input table by replacing them with the result of 
evaluating the provided expressions.
+Updates columns of the input table by replacing them with the result of 
evaluating the provided
+expressions. Each such column reference must appear in the input table exactly 
once.
+
+This is similar to `SELECT * EXCEPT (column), <expression> AS column` in 
regular Spark SQL.
+
+It is possible to perform multiple assignments in a single `SET` clause. Each 
assignment may refer
+to the result of previous assignments.
+
+After an assignment, top-level column names are updated but table aliases 
still refer to the
+original row values (such as an inner join between two tables `lhs` and `rhs` 
with a subsequent
+`SET` and then `SELECT lhs.col, rhs.col`).
 
 For example:
 
@@ -256,6 +281,16 @@ For example:
 VALUES (0), (1) tab(col)
 |> SET col = col * 2;
 
++---+
+|col|
++---+
+|  0|
+|  2|
++---+
+
+VALUES (0), (1) tab(col)
+|> SET col = col * 2;
+
 +---+
 |col|
 +---+
@@ -270,7 +305,14 @@ VALUES (0), (1) tab(col)
 |> DROP <column>, ...
 ```
 
-Drops columns of the input table by name.
+Drops columns of the input table by name. Each such column reference must 
appear in the input table
+exactly once.
+
+This is similar to `SELECT * EXCEPT (column)` in regular Spark SQL.
+
+After a `DROP` operation, top-level column names are updated but table aliases 
still refer to the
+original row values (such as an inner join between two tables `lhs` and `rhs` 
with a subsequent
+`DROP` and then `SELECT lhs.col, rhs.col`).
 
 For example:
 
@@ -293,18 +335,25 @@ VALUES (0, 1) tab(col1, col2)
 
 Retains the same rows and column names of the input table but with a new table 
alias.
 
+This operator is useful for introducing a new alias for the input table, which 
can then be referred
+to in subsequent operators. Any existing alias for the table is replaced by 
the new alias.
+
+It is useful to use this operator after adding new columns with `SELECT` or 
`EXTEND` or after
+performing aggregation with `AGGREGATE`. This simplifies the process of 
referring to the columns
+from subsequent `JOIN` operators and allows for more readable queries.
+
 For example:
 
 ```sql
 VALUES (0, 1) tab(col1, col2)
-|> AS new_tab;
-|> SELECT * FROM new_tab;
+|> AS new_tab
+|> SELECT col1 + col2 FROM new_tab;
 
-+----+----+
-|col1|col2|
-+----+----+
-|   0|   1|
-+----+----+
++-----------+
+|col1 + col2|
++-----------+
+|          1|
++-----------+
 ```
 
 #### WHERE
@@ -357,22 +406,48 @@ VALUES (0), (0) tab(col)
 #### AGGREGATE
 
 ```sql
+-- Full-table aggregation
 |> AGGREGATE <agg_expr> [[AS] alias], ...
-```
-
-Performs full-table aggregation, returning one result row with a column for 
each aggregate expression.
 
-```sql
+-- Aggregation with grouping
 |> AGGREGATE [<agg_expr> [[AS] alias], ...] GROUP BY <grouping_expr> [AS 
alias], ...
 ```
 
-Performs aggregation with grouping, returning one row per group. The column 
list includes the
-grouping columns first and then the aggregate columns afterward. Aliases can 
be assigned directly
-on grouping expressions.
+Performs aggregation across grouped rows or across the entire input table.
+
+If no `GROUP BY` clause is present, this performs full-table aggregation, 
returning one result row
+with a column for each aggregate expression. Othwrise, this performs 
aggregation with grouping,
+returning one row per group. Aliases can be assigned directly on grouping 
expressions.
+
+The output column list of this operator includes the grouping columns first 
(if any), and then the
+aggregate columns afterward. 
+
+Each `<agg_expr>` expression can include standard aggregate function(s) like 
`COUNT`, `SUM`, `AVG`,
+`MIN`, or any other aggregate function(s) that Spark SQL supports. Additional 
expressions may appear
+below or above the aggregate function(s), such as `MIN(FLOOR(col)) + 1`. Each 
`<agg_expr>`
+expression must contain at least one aggregate function (or otherwise the 
query returns an error).
+Each `<agg_expr>` expression may include a column alias with `AS <alias>`, and 
may also
+include a `DISTINCT` keyword to remove duplicate values before applying the 
aggregate function (for
+example, `COUNT(DISTINCT col)`).
+
+If present, the `GROUP BY` clause can include any number of grouping 
expressions, and each
+`<agg_expr>` expression will evaluate over each unique combination of values 
of the grouping
+expressions. The output table contains the evaluated grouping expressions 
followed by the evaluated
+aggregate functions. The `GROUP BY` expressions may include one-based 
ordinals. Unlike regular SQL
+in which such ordinals refer to the expressions in the accompanying `SELECT` 
clause, in SQL pipe
+syntax, they refer to the columns of the relation produced by the preceding 
operator instead. For
+example, in `TABLE t |> AGGREGATE COUNT(*) GROUP BY 2`, we refer to the second 
column of the input
+table `t`.
+
+There is no need to repeat entire expressions between `GROUP BY` and `SELECT`, 
since the `AGGREGATE` 
+operator automatically includes the evaluated grouping expressions in its 
output. By the same token,
+after an `AGGREGATE` operator, it is often unnecessary to issue a following 
`SELECT` operator, since
+`AGGREGATE` returns both the grouping columns and the aggregate columns in a 
single step.
 
 For example:
 
 ```sql
+-- Full-table aggregation
 VALUES (0), (1) tab(col)
 |> AGGREGATE COUNT(col) AS count;
 
@@ -382,6 +457,7 @@ VALUES (0), (1) tab(col)
 |    2|
 +-----+
 
+-- Aggregation with grouping
 VALUES (0, 1), (0, 2) tab(col1, col2)
 |> AGGREGATE COUNT(col2) AS count GROUP BY col1;
 
@@ -398,19 +474,45 @@ VALUES (0, 1), (0, 2) tab(col1, col2)
 |> [LEFT | RIGHT | FULL | CROSS | SEMI | ANTI | NATURAL | LATERAL] JOIN 
<table> [ON <condition> | USING(col, ...)]
 ```
 
-Joins rows from both inputs, returning a filtered cross-product of the pipe 
input table and the table expression following the JOIN keyword.
+Joins rows from both inputs, returning a filtered cross-product of the pipe 
input table and the
+table expression following the JOIN keyword. This behaves a similar manner as 
the `JOIN` clause in
+regular SQL where the pipe operator input table becomes the left side of the 
join and the table
+argument becomes the right side of the join.
+
+Standard join modifiers like `LEFT`, `RIGHT`, and `FULL` are supported before 
the `JOIN` keyword.
+
+The join predicate may need to refer to columns from both inputs to the join. 
In this case, it may
+be necessary to use table aliases to differentiate between columns in the 
event that both inputs
+have columns with the same names. The `AS` operator can be useful here to 
introduce a new alias for
+the pipe input table that becomes the left side of the join. Use standard 
syntax to assign an alias
+to the table argument that becomes the right side of the join, if needed.
 
 For example:
 
 ```sql
-VALUES (0, 1) tab(a, b)
-|> JOIN VALUES (0, 2) tab(c, d) ON a = c;
+SELECT 0 AS a, 1 AS b
+|> AS lhs
+|> JOIN VALUES (0, 2) rhs(a, b) ON (lhs.a = rhs.a);
 
 +---+---+---+---+
 |  a|  b|  c|  d|
 +---+---+---+---+
 |  0|  1|  0|  2|
 +---+---+---+---+
+
+VALUES ('apples', 3), ('bananas', 4) t(item, sales)
+|> AS produce_sales
+|> LEFT JOIN
+     (SELECT "apples" AS item, 123 AS id) AS produce_data
+     USING (item)
+|> SELECT produce_sales.item, sales, id;
+
+/*---------+-------+------+
+ | item    | sales | id   |
+ +---------+-------+------+
+ | apples  | 3     | 123  |
+ | bananas | 4     | NULL |
+ +---------+-------+------*/
 ```
 
 #### ORDER BY
@@ -419,7 +521,8 @@ VALUES (0, 1) tab(a, b)
 |> ORDER BY <expr> [ASC | DESC], ...
 ```
 
-Returns the input rows after sorting as indicated. Standard modifiers are 
supported including NULLS FIRST/LAST.
+Returns the input rows after sorting as indicated. Standard modifiers are 
supported including NULLS
+FIRST/LAST.
 
 For example:
 
@@ -438,10 +541,10 @@ VALUES (0), (1) tab(col)
 #### UNION, INTERSECT, EXCEPT
 
 ```sql
-|> {UNION | INTERSECT | EXCEPT} {ALL | DISTINCT} (<query>), (<query>), ...
+|> {UNION | INTERSECT | EXCEPT} {ALL | DISTINCT} (<query>)
 ```
 
-Performs the union or other set operation over the combined rows from the 
input table plus one or more tables provided as input arguments.
+Performs the union or other set operation over the combined rows from the 
input table or subquery.
 
 For example:
 
@@ -469,12 +572,22 @@ For example:
 
 ```sql
 VALUES (0), (0), (0), (0) tab(col)
-|> TABLESAMPLE BERNOULLI(1 ROWS);
+|> TABLESAMPLE (1 ROWS);
+
++---+
+|col|
++---+
+|  0|
++---+
+
+VALUES (0), (0) tab(col)
+|> TABLESAMPLE (100 PERCENT);
 
 +---+
 |col|
 +---+
 |  0|
+|  0|
 +---+
 ```
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index 36fd4d02f8da..56b2103c555d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -53,9 +53,10 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
       (exprs, plan)
     } else {
       plan match {
-        // For `Distinct` and `SubqueryAlias`, we can't recursively resolve 
and add attributes
-        // via its children.
-        case u: UnaryNode if !u.isInstanceOf[Distinct] && 
!u.isInstanceOf[SubqueryAlias] =>
+        // For `Distinct` and `SubqueryAlias` and `PipeOperator`, we can't 
recursively resolve and
+        // add attributes via its children.
+        case u: UnaryNode if !u.isInstanceOf[Distinct] && 
!u.isInstanceOf[SubqueryAlias]
+          && !u.isInstanceOf[PipeOperator] =>
           val (newExprs, newChild) = {
             // Resolving expressions against current plan.
             val maybeResolvedExprs = 
exprs.map(resolveExpressionByPlanOutput(_, u))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
index fe8f0f264e85..40d7d24263a7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
@@ -18,6 +18,9 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR, 
TreePattern}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 
 /**
@@ -57,6 +60,25 @@ case class PipeExpression(child: Expression, isAggregate: 
Boolean, clause: Strin
   }
 }
 
+/**
+ * Represents the location within a logical plan that a SQL pipe operator 
appeared.
+ * This acts as a logical boundary that works to prevent the analyzer from 
modifying the logical
+ * operators above and below the boundary.
+ */
+case class PipeOperator(child: LogicalPlan) extends UnaryNode {
+  final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR)
+  override def output: Seq[Attribute] = child.output
+  override def withNewChildInternal(newChild: LogicalPlan): PipeOperator = 
copy(child = newChild)
+}
+
+/** This rule removes all PipeOperator nodes from a logical plan at the end of 
analysis. */
+object EliminatePipeOperators extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(PIPE_OPERATOR), ruleId) {
+    case PipeOperator(child) => child
+  }
+}
+
 object PipeOperators {
   // These are definitions of query result clauses that can be used with the 
pipe operator.
   val aggregateClause = "AGGREGATE"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b141d2be04c3..c0c76dd44ad5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -313,6 +313,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
     private val rules = Seq(
       EliminateResolvedHint,
       EliminateSubqueryAliases,
+      EliminatePipeOperators,
       EliminateView,
       ReplaceExpressions,
       RewriteNonCorrelatedExists,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 1f9c14830364..f4f6d2b310f4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -6028,17 +6028,6 @@ class AstBuilder extends DataTypeAstBuilder
     if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
       operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
     }
-    // This helper function adds a table subquery boundary between the new 
operator to be added
-    // (such as a filter or sort) and the input plan if one does not already 
exist. This helps the
-    // analyzer behave as if we had added the corresponding SQL clause after a 
table subquery
-    // containing the input plan.
-    def withSubqueryAlias(): LogicalPlan = left match {
-      case _: SubqueryAlias | _: UnresolvedRelation | _: Join | _: Filter |
-           _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort =>
-        left
-      case _ =>
-        SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
-    }
     Option(ctx.selectClause).map { c =>
       withSelectQuerySpecification(
         ctx = ctx,
@@ -6082,7 +6071,7 @@ class AstBuilder extends DataTypeAstBuilder
       if (ctx.windowClause() != null) {
         throw 
QueryParsingErrors.windowClauseInPipeOperatorWhereClauseNotAllowedError(ctx)
       }
-      withWhereClause(c, withSubqueryAlias())
+      withWhereClause(c, PipeOperator(left))
     }.getOrElse(Option(ctx.pivotClause()).map { c =>
       if (ctx.unpivotClause() != null) {
         throw 
QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx)
@@ -6101,7 +6090,7 @@ class AstBuilder extends DataTypeAstBuilder
       val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
       visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
     }.getOrElse(Option(ctx.queryOrganization).map { c =>
-      withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
+      withQueryResultClauses(c, PipeOperator(left), forPipeOperators = true)
     }.getOrElse(
       visitOperatorPipeAggregate(ctx, left)
     ))))))))))))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 0918306de62e..3f79e74b18a4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -108,6 +108,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.UpdateOuterReferences" ::
       "org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability" ::
       
"org.apache.spark.sql.catalyst.analysis.ResolveUpdateEventTimeWatermarkColumn" 
::
+      "org.apache.spark.sql.catalyst.expressions.EliminatePipeOperators" ::
       // Catalyst Optimizer rules
       "org.apache.spark.sql.catalyst.optimizer.BooleanSimplification" ::
       "org.apache.spark.sql.catalyst.optimizer.CollapseProject" ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 80531da4a0ab..25ef341b8cef 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -79,6 +79,7 @@ object TreePattern extends Enumeration  {
   val OUTER_REFERENCE: Value = Value
   val PARAMETER: Value = Value
   val PARAMETERIZED_QUERY: Value = Value
+  val PIPE_OPERATOR: Value = Value
   val PIVOT: Value = Value
   val PLAN_EXPRESSION: Value = Value
   val PYTHON_UDF: Value = Value
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 70de582fb7b2..ac74fea1dbfb 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -306,7 +306,7 @@ from t as t_alias
 -- !query analysis
 Project [tx#x]
 +- Filter (ty#x = def)
-   +- SubqueryAlias __auto_generated_subquery_name
+   +- PipeOperator
       +- Project [pipeexpression(x#x, false, SELECT) AS tx#x, 
pipeexpression(y#x, false, SELECT) AS ty#x]
          +- SubqueryAlias t_alias
             +- SubqueryAlias spark_catalog.default.t
@@ -769,8 +769,9 @@ Filter exists#x [x#x]
 :     +- Project [a#x, b#x, pipeexpression(outer(x#x), false, EXTEND) AS 
pipeexpression(outer(spark_catalog.default.t.x))#x]
 :        +- SubqueryAlias spark_catalog.default.other
 :           +- Relation spark_catalog.default.other[a#x,b#x] json
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1002,6 +1003,38 @@ Project [x#x, y#x, z#x]
                +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
+-- !query
+values (0), (1) lhs(a)
+|> inner join values (1), (2) rhs(a) using (a)
+|> extend lhs.a + rhs.a as z1
+|> extend lhs.a - rhs.a as z2
+|> drop z1
+|> where z2 = 0
+|> order by lhs.a, rhs.a, z2
+|> set z2 = 4
+|> limit 2
+|> select lhs.a, rhs.a, z2
+-- !query analysis
+Project [a#x, a#x, z2#x]
++- GlobalLimit 2
+   +- LocalLimit 2
+      +- PipeOperator
+         +- Project [a#x, pipeexpression(4, false, SET) AS z2#x, a#x]
+            +- Sort [a#x ASC NULLS FIRST, a#x ASC NULLS FIRST, z2#x ASC NULLS 
FIRST], true
+               +- PipeOperator
+                  +- Filter (z2#x = 0)
+                     +- PipeOperator
+                        +- Project [a#x, z2#x, a#x]
+                           +- Project [a#x, z1#x, pipeexpression((a#x - a#x), 
false, EXTEND) AS z2#x, a#x]
+                              +- Project [a#x, pipeexpression((a#x + a#x), 
false, EXTEND) AS z1#x, a#x]
+                                 +- Project [a#x, a#x, a#x]
+                                    +- Join Inner, (a#x = a#x)
+                                       :- SubqueryAlias lhs
+                                       :  +- LocalRelation [a#x]
+                                       +- SubqueryAlias rhs
+                                          +- LocalRelation [a#x]
+
+
 -- !query
 table t
 |> set z = 1
@@ -1241,9 +1274,10 @@ table t
 |> where u.x = 1
 -- !query analysis
 Filter (x#x = 1)
-+- SubqueryAlias u
-   +- SubqueryAlias spark_catalog.default.t
-      +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias u
+      +- SubqueryAlias spark_catalog.default.t
+         +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1325,8 +1359,9 @@ table t
 |> where true
 -- !query analysis
 Filter true
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1334,8 +1369,9 @@ table t
 |> where x + length(y) < 4
 -- !query analysis
 Filter ((x#x + length(y#x)) < 4)
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1344,9 +1380,11 @@ table t
 |> where x + length(y) < 3
 -- !query analysis
 Filter ((x#x + length(y#x)) < 3)
-+- Filter ((x#x + length(y#x)) < 4)
-   +- SubqueryAlias spark_catalog.default.t
-      +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- Filter ((x#x + length(y#x)) < 4)
+      +- PipeOperator
+         +- SubqueryAlias spark_catalog.default.t
+            +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1354,7 +1392,7 @@ Filter ((x#x + length(y#x)) < 3)
 |> where x = 1
 -- !query analysis
 Filter (x#x = 1)
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
    +- Aggregate [x#x], [x#x, sum(length(y#x)) AS sum_len#xL]
       +- SubqueryAlias spark_catalog.default.t
          +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -1365,8 +1403,9 @@ table t
 |> where t.x = 1
 -- !query analysis
 Filter (x#x = 1)
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1374,8 +1413,9 @@ table t
 |> where spark_catalog.default.t.x = 1
 -- !query analysis
 Filter (x#x = 1)
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1383,7 +1423,7 @@ Filter (x#x = 1)
 |> where col.i1 = 1
 -- !query analysis
 Filter (col#x.i1 = 1)
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
    +- Project [col#x]
       +- SubqueryAlias spark_catalog.default.st
          +- Relation spark_catalog.default.st[x#x,col#x] parquet
@@ -1394,8 +1434,9 @@ table st
 |> where st.col.i1 = 2
 -- !query analysis
 Filter (col#x.i1 = 2)
-+- SubqueryAlias spark_catalog.default.st
-   +- Relation spark_catalog.default.st[x#x,col#x] parquet
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.st
+      +- Relation spark_catalog.default.st[x#x,col#x] parquet
 
 
 -- !query
@@ -1409,8 +1450,9 @@ Filter exists#x [x#x]
 :           +- Filter (outer(x#x) = a#x)
 :              +- SubqueryAlias spark_catalog.default.other
 :                 +- Relation spark_catalog.default.other[a#x,b#x] json
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1424,8 +1466,9 @@ Filter (scalar-subquery#x [x#x] = 1)
 :           +- Filter (outer(x#x) = a#x)
 :              +- SubqueryAlias spark_catalog.default.other
 :                 +- Relation spark_catalog.default.other[a#x,b#x] json
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -1527,7 +1570,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `z`"
+    "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -1551,7 +1594,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `z`"
+    "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -1575,7 +1618,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `z`"
+    "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -1599,7 +1642,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `z`"
+    "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -1621,7 +1664,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `sum_len`"
+    "proposal" : "`sum_len`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -2583,20 +2626,21 @@ table natural_join_test_t1
 |> where k = "one"
 -- !query analysis
 Filter (k#x = one)
-+- Project [k#x, v1#x, v2#x]
-   +- Join Inner, (k#x = k#x)
-      :- SubqueryAlias natural_join_test_t1
-      :  +- View (`natural_join_test_t1`, [k#x, v1#x])
-      :     +- Project [cast(k#x as string) AS k#x, cast(v1#x as int) AS v1#x]
-      :        +- Project [k#x, v1#x]
-      :           +- SubqueryAlias natural_join_test_t1
-      :              +- LocalRelation [k#x, v1#x]
-      +- SubqueryAlias natural_join_test_t2
-         +- View (`natural_join_test_t2`, [k#x, v2#x])
-            +- Project [cast(k#x as string) AS k#x, cast(v2#x as int) AS v2#x]
-               +- Project [k#x, v2#x]
-                  +- SubqueryAlias natural_join_test_t2
-                     +- LocalRelation [k#x, v2#x]
++- PipeOperator
+   +- Project [k#x, v1#x, v2#x]
+      +- Join Inner, (k#x = k#x)
+         :- SubqueryAlias natural_join_test_t1
+         :  +- View (`natural_join_test_t1`, [k#x, v1#x])
+         :     +- Project [cast(k#x as string) AS k#x, cast(v1#x as int) AS 
v1#x]
+         :        +- Project [k#x, v1#x]
+         :           +- SubqueryAlias natural_join_test_t1
+         :              +- LocalRelation [k#x, v1#x]
+         +- SubqueryAlias natural_join_test_t2
+            +- View (`natural_join_test_t2`, [k#x, v2#x])
+               +- Project [cast(k#x as string) AS k#x, cast(v2#x as int) AS 
v2#x]
+                  +- Project [k#x, v2#x]
+                     +- SubqueryAlias natural_join_test_t2
+                        +- LocalRelation [k#x, v2#x]
 
 
 -- !query
@@ -2774,7 +2818,7 @@ values (2, 'xyz') tab(x, y)
 |> where x = 0
 -- !query analysis
 Filter (x#x = 0)
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
    +- Distinct
       +- Union false, false
          :- SubqueryAlias tab
@@ -2932,8 +2976,9 @@ table t
 |> order by x
 -- !query analysis
 Sort [x#x ASC NULLS FIRST], true
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -2941,7 +2986,7 @@ Sort [x#x ASC NULLS FIRST], true
 |> order by x
 -- !query analysis
 Sort [x#x ASC NULLS FIRST], true
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
    +- Project [x#x, y#x]
       +- SubqueryAlias spark_catalog.default.t
          +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -2952,8 +2997,9 @@ values (0, 'abc') tab(x, y)
 |> order by x
 -- !query analysis
 Sort [x#x ASC NULLS FIRST], true
-+- SubqueryAlias tab
-   +- LocalRelation [x#x, y#x]
++- PipeOperator
+   +- SubqueryAlias tab
+      +- LocalRelation [x#x, y#x]
 
 
 -- !query
@@ -2963,9 +3009,11 @@ table t
 -- !query analysis
 GlobalLimit 1
 +- LocalLimit 1
-   +- Sort [x#x ASC NULLS FIRST], true
-      +- SubqueryAlias spark_catalog.default.t
-         +- Relation spark_catalog.default.t[x#x,y#x] csv
+   +- PipeOperator
+      +- Sort [x#x ASC NULLS FIRST], true
+         +- PipeOperator
+            +- SubqueryAlias spark_catalog.default.t
+               +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -2977,11 +3025,12 @@ table t
 GlobalLimit 2
 +- LocalLimit 2
    +- Offset 1
-      +- SubqueryAlias __auto_generated_subquery_name
+      +- PipeOperator
          +- Project [y#x]
             +- Filter (x#x = 1)
-               +- SubqueryAlias spark_catalog.default.t
-                  +- Relation spark_catalog.default.t[x#x,y#x] csv
+               +- PipeOperator
+                  +- SubqueryAlias spark_catalog.default.t
+                     +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -2991,11 +3040,12 @@ table t
 |> offset 1
 -- !query analysis
 Offset 1
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
    +- Project [y#x]
       +- Filter (x#x = 1)
-         +- SubqueryAlias spark_catalog.default.t
-            +- Relation spark_catalog.default.t[x#x,y#x] csv
+         +- PipeOperator
+            +- SubqueryAlias spark_catalog.default.t
+               +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -3003,8 +3053,9 @@ table t
 |> limit all offset 0
 -- !query analysis
 Offset 0
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -3012,8 +3063,9 @@ table t
 |> distribute by x
 -- !query analysis
 RepartitionByExpression [x#x]
-+- SubqueryAlias spark_catalog.default.t
-   +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -3022,8 +3074,9 @@ table t
 -- !query analysis
 Sort [x#x ASC NULLS FIRST], false
 +- RepartitionByExpression [x#x]
-   +- SubqueryAlias spark_catalog.default.t
-      +- Relation spark_catalog.default.t[x#x,y#x] csv
+   +- PipeOperator
+      +- SubqueryAlias spark_catalog.default.t
+         +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -3032,8 +3085,9 @@ table t
 -- !query analysis
 RepartitionByExpression [x#x]
 +- Sort [x#x ASC NULLS FIRST], false
-   +- SubqueryAlias spark_catalog.default.t
-      +- Relation spark_catalog.default.t[x#x,y#x] csv
+   +- PipeOperator
+      +- SubqueryAlias spark_catalog.default.t
+         +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -3043,8 +3097,9 @@ order by y
 -- !query analysis
 Sort [y#x ASC NULLS FIRST], true
 +- Sort [x#x DESC NULLS LAST], true
-   +- SubqueryAlias spark_catalog.default.t
-      +- Relation spark_catalog.default.t[x#x,y#x] csv
+   +- PipeOperator
+      +- SubqueryAlias spark_catalog.default.t
+         +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -3312,7 +3367,7 @@ table other
 |> where a = 1
 -- !query analysis
 Filter (a#x = 1)
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
    +- Aggregate [a#x], [a#x]
       +- SubqueryAlias spark_catalog.default.other
          +- Relation spark_catalog.default.other[a#x,b#x] json
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql 
b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
index ec4afc6b2372..0cae29d722a8 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
@@ -359,6 +359,18 @@ table t
 |> extend 1 as z
 |> set z = first_value(x) over (partition by y);
 
+-- Any prior table aliases remain visible after a SET operator.
+values (0), (1) lhs(a)
+|> inner join values (1), (2) rhs(a) using (a)
+|> extend lhs.a + rhs.a as z1
+|> extend lhs.a - rhs.a as z2
+|> drop z1
+|> where z2 = 0
+|> order by lhs.a, rhs.a, z2
+|> set z2 = 4
+|> limit 2
+|> select lhs.a, rhs.a, z2;
+
 -- SET operators: negative tests.
 ---------------------------------
 
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 3dd212d889f9..0d5ec57b9e47 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -940,6 +940,23 @@ struct<x:int,y:string,z:int>
 1      def     1
 
 
+-- !query
+values (0), (1) lhs(a)
+|> inner join values (1), (2) rhs(a) using (a)
+|> extend lhs.a + rhs.a as z1
+|> extend lhs.a - rhs.a as z2
+|> drop z1
+|> where z2 = 0
+|> order by lhs.a, rhs.a, z2
+|> set z2 = 4
+|> limit 2
+|> select lhs.a, rhs.a, z2
+-- !query schema
+struct<a:int,a:int,z2:int>
+-- !query output
+1      1       4
+
+
 -- !query
 table t
 |> set z = 1
@@ -1481,7 +1498,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `z`"
+    "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -1507,7 +1524,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `z`"
+    "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -1533,7 +1550,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `z`"
+    "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -1559,7 +1576,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `z`"
+    "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -1583,7 +1600,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "sqlState" : "42703",
   "messageParameters" : {
     "objectName" : "`y`",
-    "proposal" : "`x`, `sum_len`"
+    "proposal" : "`sum_len`, `spark_catalog`.`default`.`t`.`x`"
   },
   "queryContext" : [ {
     "objectType" : "",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to