This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a8bec11af941 [SPARK-49565][SQL] Improve auto-generated expression 
aliases with pipe SQL operators
a8bec11af941 is described below

commit a8bec11af9417956f8552d2dfb72c9afc80a7671
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Fri Jan 10 11:21:50 2025 +0800

    [SPARK-49565][SQL] Improve auto-generated expression aliases with pipe SQL 
operators
    
    ### What changes were proposed in this pull request?
    
    This RP improves auto-generated expression aliases with pipe SQL operators.
    
    For example, consider the pipe SQL syntax query:
    
    ```
    table t
    |> extend 1
    ```
    
    Previously, the analyzed plan was:
    
    ```
    Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS pipeexpression(1)#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
    ```
    
    After this PR, it is:
    
    ```
    Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS 1#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
    ```
    
    Note that the output aliases visible in the resulting DataFrame for the 
query derive from the `AS <alias>` part of the analyzed plans shown.
    
    ### Why are the changes needed?
    
    This improves the user experience with pipe SQL syntax.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see above.
    
    ### How was this patch tested?
    
    Existing golden file tests update to show the improved aliases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49245 from dtenedor/fix-pipe-output-aliases.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   1 +
 .../sql/catalyst/expressions/pipeOperators.scala   |  67 ++++---
 .../sql/catalyst/rules/RuleIdCollection.scala      |   1 +
 .../spark/sql/catalyst/trees/TreePatterns.scala    |   1 +
 .../analyzer-results/pipe-operators.sql.out        | 203 +++++++++++----------
 .../sql-tests/results/pipe-operators.sql.out       |  26 +--
 6 files changed, 161 insertions(+), 138 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e8839148f51b..9282e0554a2d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -373,6 +373,7 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
       ResolveProcedures ::
       BindProcedures ::
       ResolveTableSpec ::
+      ValidateAndStripPipeExpressions ::
       ResolveAliases ::
       ResolveSubquery ::
       ResolveSubqueryColumnAliases ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
index 40d7d24263a7..2ee68663ad2f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR, 
TreePattern}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_EXPRESSION, 
PIPE_OPERATOR, TreePattern}
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.DataType
 
 /**
  * Represents an expression when used with a SQL pipe operator.
@@ -33,31 +34,12 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
  * @param clause The clause of the pipe operator. This is used to generate 
error messages.
  */
 case class PipeExpression(child: Expression, isAggregate: Boolean, clause: 
String)
-  extends UnaryExpression with RuntimeReplaceable {
+  extends UnaryExpression with Unevaluable {
+  final override val nodePatterns = Seq(PIPE_EXPRESSION)
+  final override lazy val resolved = false
   override def withNewChildInternal(newChild: Expression): Expression =
     PipeExpression(newChild, isAggregate, clause)
-  override lazy val replacement: Expression = {
-    val firstAggregateFunction: Option[AggregateFunction] = 
findFirstAggregate(child)
-    if (isAggregate && firstAggregateFunction.isEmpty) {
-      throw 
QueryCompilationErrors.pipeOperatorAggregateExpressionContainsNoAggregateFunction(child)
-    } else if (!isAggregate) {
-      firstAggregateFunction.foreach { a =>
-        throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, 
clause)
-      }
-    }
-    child
-  }
-
-  /** Returns the first aggregate function in the given expression, or None if 
not found. */
-  private def findFirstAggregate(e: Expression): Option[AggregateFunction] = e 
match {
-    case a: AggregateFunction =>
-      Some(a)
-    case _: WindowExpression =>
-      // Window functions are allowed in these pipe operators, so do not 
traverse into children.
-      None
-    case _ =>
-      e.children.flatMap(findFirstAggregate).headOption
-  }
+  override def dataType: DataType = child.dataType
 }
 
 /**
@@ -79,6 +61,43 @@ object EliminatePipeOperators extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Validates and strips PipeExpression nodes from a logical plan once the 
child expressions are
+ * resolved.
+ */
+object ValidateAndStripPipeExpressions extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+    _.containsPattern(PIPE_EXPRESSION), ruleId) {
+    case node: LogicalPlan =>
+      node.resolveExpressions {
+        case p: PipeExpression if p.child.resolved =>
+          // Once the child expression is resolved, we can perform the 
necessary invariant checks
+          // and then remove this expression, replacing it with the child 
expression instead.
+          val firstAggregateFunction: Option[AggregateFunction] = 
findFirstAggregate(p.child)
+          if (p.isAggregate && firstAggregateFunction.isEmpty) {
+            throw QueryCompilationErrors
+              
.pipeOperatorAggregateExpressionContainsNoAggregateFunction(p.child)
+          } else if (!p.isAggregate) {
+            firstAggregateFunction.foreach { a =>
+              throw 
QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause)
+            }
+          }
+          p.child
+      }
+  }
+
+  /** Returns the first aggregate function in the given expression, or None if 
not found. */
+  private def findFirstAggregate(e: Expression): Option[AggregateFunction] = e 
match {
+    case a: AggregateFunction =>
+      Some(a)
+    case _: WindowExpression =>
+      // Window functions are allowed in these pipe operators, so do not 
traverse into children.
+      None
+    case _ =>
+      e.children.flatMap(findFirstAggregate).headOption
+  }
+}
+
 object PipeOperators {
   // These are definitions of query result clauses that can be used with the 
pipe operator.
   val aggregateClause = "AGGREGATE"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 3f79e74b18a4..ee5245054bcc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -109,6 +109,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability" ::
       
"org.apache.spark.sql.catalyst.analysis.ResolveUpdateEventTimeWatermarkColumn" 
::
       "org.apache.spark.sql.catalyst.expressions.EliminatePipeOperators" ::
+      
"org.apache.spark.sql.catalyst.expressions.ValidateAndStripPipeExpressions" ::
       // Catalyst Optimizer rules
       "org.apache.spark.sql.catalyst.optimizer.BooleanSimplification" ::
       "org.apache.spark.sql.catalyst.optimizer.CollapseProject" ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 25ef341b8cef..b56085ecae8d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -79,6 +79,7 @@ object TreePattern extends Enumeration  {
   val OUTER_REFERENCE: Value = Value
   val PARAMETER: Value = Value
   val PARAMETERIZED_QUERY: Value = Value
+  val PIPE_EXPRESSION: Value = Value
   val PIPE_OPERATOR: Value = Value
   val PIVOT: Value = Value
   val PLAN_EXPRESSION: Value = Value
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index ac74fea1dbfb..b231199cc473 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -283,7 +283,7 @@ SubqueryAlias spark_catalog.default.t
 from t
 |> select 1 as x
 -- !query analysis
-Project [pipeexpression(1, false, SELECT) AS x#x]
+Project [1 AS x#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -307,7 +307,7 @@ from t as t_alias
 Project [tx#x]
 +- Filter (ty#x = def)
    +- PipeOperator
-      +- Project [pipeexpression(x#x, false, SELECT) AS tx#x, 
pipeexpression(y#x, false, SELECT) AS ty#x]
+      +- Project [x#x AS tx#x, y#x AS ty#x]
          +- SubqueryAlias t_alias
             +- SubqueryAlias spark_catalog.default.t
                +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -317,7 +317,7 @@ Project [tx#x]
 from t, other
 |> select t.x + other.a as z
 -- !query analysis
-Project [pipeexpression((x#x + a#x), false, SELECT) AS z#x]
+Project [(x#x + a#x) AS z#x]
 +- Join Inner
    :- SubqueryAlias spark_catalog.default.t
    :  +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -329,7 +329,7 @@ Project [pipeexpression((x#x + a#x), false, SELECT) AS z#x]
 from t join other on (t.x = other.a)
 |> select t.x + other.a as z
 -- !query analysis
-Project [pipeexpression((x#x + a#x), false, SELECT) AS z#x]
+Project [(x#x + a#x) AS z#x]
 +- Join Inner, (x#x = a#x)
    :- SubqueryAlias spark_catalog.default.t
    :  +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -341,7 +341,7 @@ Project [pipeexpression((x#x + a#x), false, SELECT) AS z#x]
 from t lateral view explode(array(100, 101)) as ly
 |> select t.x + ly as z
 -- !query analysis
-Project [pipeexpression((x#x + ly#x), false, SELECT) AS z#x]
+Project [(x#x + ly#x) AS z#x]
 +- Generate explode(array(100, 101)), false, as, [ly#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -370,7 +370,7 @@ Project [col#x.i1 AS i1#x]
 from values (0), (1) tab(col)
 |> select col as x
 -- !query analysis
-Project [pipeexpression(col#x, false, SELECT) AS x#x]
+Project [col#x AS x#x]
 +- SubqueryAlias tab
    +- LocalRelation [col#x]
 
@@ -394,7 +394,7 @@ org.apache.spark.sql.catalyst.parser.ParseException
 table t
 |> select 1 as x
 -- !query analysis
-Project [pipeexpression(1, false, SELECT) AS x#x]
+Project [1 AS x#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -413,7 +413,7 @@ table t
 |> select x, y
 |> select x + length(y) as z
 -- !query analysis
-Project [pipeexpression((x#x + length(y#x)), false, SELECT) AS z#x]
+Project [(x#x + length(y#x)) AS z#x]
 +- Project [x#x, y#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -423,7 +423,7 @@ Project [pipeexpression((x#x + length(y#x)), false, SELECT) 
AS z#x]
 values (0), (1) tab(col)
 |> select col * 2 as result
 -- !query analysis
-Project [pipeexpression((col#x * 2), false, SELECT) AS result#x]
+Project [(col#x * 2) AS result#x]
 +- SubqueryAlias tab
    +- LocalRelation [col#x]
 
@@ -432,7 +432,7 @@ Project [pipeexpression((col#x * 2), false, SELECT) AS 
result#x]
 (select * from t union all select * from t)
 |> select x + length(y) as result
 -- !query analysis
-Project [pipeexpression((x#x + length(y#x)), false, SELECT) AS result#x]
+Project [(x#x + length(y#x)) AS result#x]
 +- Union false, false
    :- Project [x#x, y#x]
    :  +- SubqueryAlias spark_catalog.default.t
@@ -483,7 +483,7 @@ Project [col#x.i1 AS i1#x]
 table t
 |> select (select a from other where x = a limit 1) as result
 -- !query analysis
-Project [pipeexpression(scalar-subquery#x [x#x], false, SELECT) AS result#x]
+Project [scalar-subquery#x [x#x] AS result#x]
 :  +- GlobalLimit 1
 :     +- LocalLimit 1
 :        +- Project [a#x]
@@ -508,7 +508,7 @@ Project [scalar-subquery#x [] AS result#x]
 table t
 |> select (select any_value(a) from other where x = a limit 1) as result
 -- !query analysis
-Project [pipeexpression(scalar-subquery#x [x#x], false, SELECT) AS result#x]
+Project [scalar-subquery#x [x#x] AS result#x]
 :  +- GlobalLimit 1
 :     +- LocalLimit 1
 :        +- Aggregate [any_value(a#x, false) AS any_value(a)#x]
@@ -523,8 +523,8 @@ Project [pipeexpression(scalar-subquery#x [x#x], false, 
SELECT) AS result#x]
 table t
 |> select x + length(x) as z, z + 1 as plus_one
 -- !query analysis
-Project [z#x, pipeexpression((z#x + 1), false, SELECT) AS plus_one#x]
-+- Project [x#x, y#x, pipeexpression((x#x + length(cast(x#x as string))), 
false, SELECT) AS z#x]
+Project [z#x, (z#x + 1) AS plus_one#x]
++- Project [x#x, y#x, (x#x + length(cast(x#x as string))) AS z#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -534,8 +534,8 @@ table t
 |> select first_value(x) over (partition by y) as result
 -- !query analysis
 Project [result#x]
-+- Project [x#x, y#x, _we0#x, pipeexpression(_we0#x, false, SELECT) AS 
result#x]
-   +- Window [first_value(x#x, false) windowspecdefinition(y#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS _we0#x], [y#x]
++- Project [x#x, y#x, result#x, result#x]
+   +- Window [first_value(x#x, false) windowspecdefinition(y#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS result#x], [y#x]
       +- Project [x#x, y#x]
          +- SubqueryAlias spark_catalog.default.t
             +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -551,8 +551,8 @@ select 1 x, 2 y, 3 z
 -- !query analysis
 Project [a2#x]
 +- Project [(1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING)#x, x#x, a2#x]
-   +- Project [x#x, y#x, _w1#x, z#x, _we0#xL, avg(y) OVER (ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, _we2#x, (cast(1 as bigint) + 
_we0#xL) AS (1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING)#x, pipeexpression(_we2#x, false, SELECT) AS a2#x]
-      +- Window [avg(_w1#x) windowspecdefinition(y#x, z#x ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
_we2#x], [y#x], [z#x ASC NULLS FIRST]
+   +- Project [x#x, y#x, _w1#x, z#x, _we0#xL, avg(y) OVER (ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, a2#x, (cast(1 as bigint) + 
_we0#xL) AS (1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING)#x, a2#x]
+      +- Window [avg(_w1#x) windowspecdefinition(y#x, z#x ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
a2#x], [y#x], [z#x ASC NULLS FIRST]
          +- Window [sum(x#x) 
windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), 
unboundedfollowing$())) AS _we0#xL, avg(y#x) 
windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), 
unboundedfollowing$())) AS avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)#x]
             +- Project [x#x, y#x, (x#x + 1) AS _w1#x, z#x]
                +- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
@@ -680,7 +680,7 @@ org.apache.spark.sql.AnalysisException
 table t
 |> extend 1 as z
 -- !query analysis
-Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+Project [x#x, y#x, 1 AS z#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -689,7 +689,7 @@ Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
 table t
 |> extend 1
 -- !query analysis
-Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS pipeexpression(1)#x]
+Project [x#x, y#x, 1 AS 1#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -698,7 +698,7 @@ Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS 
pipeexpression(1)#x]
 table t
 |> extend x as z
 -- !query analysis
-Project [x#x, y#x, pipeexpression(x#x, false, EXTEND) AS z#x]
+Project [x#x, y#x, x#x AS z#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -707,7 +707,7 @@ Project [x#x, y#x, pipeexpression(x#x, false, EXTEND) AS 
z#x]
 table t
 |> extend x + length(y) as z
 -- !query analysis
-Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, EXTEND) AS z#x]
+Project [x#x, y#x, (x#x + length(y#x)) AS z#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -716,7 +716,7 @@ Project [x#x, y#x, pipeexpression((x#x + length(y#x)), 
false, EXTEND) AS z#x]
 table t
 |> extend x + length(y) as z, x + 1 as zz
 -- !query analysis
-Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, EXTEND) AS z#x, 
pipeexpression((x#x + 1), false, EXTEND) AS zz#x]
+Project [x#x, y#x, (x#x + length(y#x)) AS z#x, (x#x + 1) AS zz#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -726,8 +726,8 @@ table t
 |> extend x + length(y) as z
 |> extend z + 1 as zz
 -- !query analysis
-Project [x#x, y#x, z#x, pipeexpression((z#x + 1), false, EXTEND) AS zz#x]
-+- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, EXTEND) AS 
z#x]
+Project [x#x, y#x, z#x, (z#x + 1) AS zz#x]
++- Project [x#x, y#x, (x#x + length(y#x)) AS z#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -736,7 +736,7 @@ Project [x#x, y#x, z#x, pipeexpression((z#x + 1), false, 
EXTEND) AS zz#x]
 select col from st
 |> extend col.i1 as z
 -- !query analysis
-Project [col#x, pipeexpression(col#x.i1, false, EXTEND) AS z#x]
+Project [col#x, col#x.i1 AS z#x]
 +- Project [col#x]
    +- SubqueryAlias spark_catalog.default.st
       +- Relation spark_catalog.default.st[x#x,col#x] parquet
@@ -746,7 +746,7 @@ Project [col#x, pipeexpression(col#x.i1, false, EXTEND) AS 
z#x]
 table t
 |> extend (select a from other where x = a limit 1) as z
 -- !query analysis
-Project [x#x, y#x, pipeexpression(scalar-subquery#x [x#x], false, EXTEND) AS 
z#x]
+Project [x#x, y#x, scalar-subquery#x [x#x] AS z#x]
 :  +- GlobalLimit 1
 :     +- LocalLimit 1
 :        +- Project [a#x]
@@ -765,8 +765,8 @@ table t
     |> select * except (a, b))
 -- !query analysis
 Filter exists#x [x#x]
-:  +- Project [pipeexpression(outer(spark_catalog.default.t.x))#x]
-:     +- Project [a#x, b#x, pipeexpression(outer(x#x), false, EXTEND) AS 
pipeexpression(outer(spark_catalog.default.t.x))#x]
+:  +- Project [x#x]
+:     +- Project [a#x, b#x, outer(x#x)]
 :        +- SubqueryAlias spark_catalog.default.other
 :           +- Relation spark_catalog.default.other[a#x,b#x] json
 +- PipeOperator
@@ -778,7 +778,7 @@ Filter exists#x [x#x]
 table t
 |> extend 1 as x
 -- !query analysis
-Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS x#x]
+Project [x#x, y#x, 1 AS x#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -788,8 +788,8 @@ table t
 |> extend first_value(x) over (partition by y) as result
 -- !query analysis
 Project [x#x, y#x, result#x]
-+- Project [x#x, y#x, _we0#x, pipeexpression(_we0#x, false, EXTEND) AS 
result#x]
-   +- Window [first_value(x#x, false) windowspecdefinition(y#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS _we0#x], [y#x]
++- Project [x#x, y#x, result#x, result#x]
+   +- Window [first_value(x#x, false) windowspecdefinition(y#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS result#x], [y#x]
       +- Project [x#x, y#x]
          +- SubqueryAlias spark_catalog.default.t
             +- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -799,8 +799,8 @@ Project [x#x, y#x, result#x]
 table t
 |> extend x + length(y) as z, z + 1 as plus_one
 -- !query analysis
-Project [x#x, y#x, z#x, pipeexpression((z#x + 1), false, EXTEND) AS plus_one#x]
-+- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, EXTEND) AS 
z#x]
+Project [x#x, y#x, z#x, (z#x + 1) AS plus_one#x]
++- Project [x#x, y#x, (x#x + length(y#x)) AS z#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -861,7 +861,7 @@ org.apache.spark.sql.AnalysisException
 table t
 |> set x = 1
 -- !query analysis
-Project [pipeexpression(1, false, SET) AS x#x, y#x]
+Project [1 AS x#x, y#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -870,7 +870,7 @@ Project [pipeexpression(1, false, SET) AS x#x, y#x]
 table t
 |> set y = x
 -- !query analysis
-Project [x#x, pipeexpression(x#x, false, SET) AS y#x]
+Project [x#x, x#x AS y#x]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -880,8 +880,8 @@ table t
 |> extend 1 as z
 |> set z = x + length(y)
 -- !query analysis
-Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, SET) AS z#x]
-+- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+Project [x#x, y#x, (x#x + length(y#x)) AS z#x]
++- Project [x#x, y#x, 1 AS z#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -892,10 +892,10 @@ table t
 |> extend 2 as zz
 |> set z = x + length(y), zz = x + 1
 -- !query analysis
-Project [x#x, y#x, z#x, pipeexpression((x#x + 1), false, SET) AS zz#x]
-+- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, SET) AS z#x, 
zz#x]
-   +- Project [x#x, y#x, z#x, pipeexpression(2, false, EXTEND) AS zz#x]
-      +- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+Project [x#x, y#x, z#x, (x#x + 1) AS zz#x]
++- Project [x#x, y#x, (x#x + length(y#x)) AS z#x, zz#x]
+   +- Project [x#x, y#x, z#x, 2 AS zz#x]
+      +- Project [x#x, y#x, 1 AS z#x]
          +- SubqueryAlias spark_catalog.default.t
             +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -905,9 +905,9 @@ table other
 |> extend 3 as c
 |> set a = b, b = c
 -- !query analysis
-Project [a#x, pipeexpression(c#x, false, SET) AS b#x, c#x]
-+- Project [pipeexpression(b#x, false, SET) AS a#x, b#x, c#x]
-   +- Project [a#x, b#x, pipeexpression(3, false, EXTEND) AS c#x]
+Project [a#x, c#x AS b#x, c#x]
++- Project [b#x AS a#x, b#x, c#x]
+   +- Project [a#x, b#x, 3 AS c#x]
       +- SubqueryAlias spark_catalog.default.other
          +- Relation spark_catalog.default.other[a#x,b#x] json
 
@@ -918,10 +918,10 @@ table t
 |> extend 2 as zz
 |> set z = x + length(y), zz = z + 1
 -- !query analysis
-Project [x#x, y#x, z#x, pipeexpression((z#x + 1), false, SET) AS zz#x]
-+- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, SET) AS z#x, 
zz#x]
-   +- Project [x#x, y#x, z#x, pipeexpression(2, false, EXTEND) AS zz#x]
-      +- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+Project [x#x, y#x, z#x, (z#x + 1) AS zz#x]
++- Project [x#x, y#x, (x#x + length(y#x)) AS z#x, zz#x]
+   +- Project [x#x, y#x, z#x, 2 AS zz#x]
+      +- Project [x#x, y#x, 1 AS z#x]
          +- SubqueryAlias spark_catalog.default.t
             +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -932,9 +932,9 @@ table t
 |> set z = x + length(y)
 |> set z = z + 1
 -- !query analysis
-Project [x#x, y#x, pipeexpression((z#x + 1), false, SET) AS z#x]
-+- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, SET) AS z#x]
-   +- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+Project [x#x, y#x, (z#x + 1) AS z#x]
++- Project [x#x, y#x, (x#x + length(y#x)) AS z#x]
+   +- Project [x#x, y#x, 1 AS z#x]
       +- SubqueryAlias spark_catalog.default.t
          +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -944,9 +944,9 @@ table t
 |> extend 1 as z
 |> set z = x + length(y), z = z + 1
 -- !query analysis
-Project [x#x, y#x, pipeexpression((z#x + 1), false, SET) AS z#x]
-+- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, SET) AS z#x]
-   +- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+Project [x#x, y#x, (z#x + 1) AS z#x]
++- Project [x#x, y#x, (x#x + length(y#x)) AS z#x]
+   +- Project [x#x, y#x, 1 AS z#x]
       +- SubqueryAlias spark_catalog.default.t
          +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -956,8 +956,8 @@ select col from st
 |> extend 1 as z
 |> set z = col.i1
 -- !query analysis
-Project [col#x, pipeexpression(col#x.i1, false, SET) AS z#x]
-+- Project [col#x, pipeexpression(1, false, EXTEND) AS z#x]
+Project [col#x, col#x.i1 AS z#x]
++- Project [col#x, 1 AS z#x]
    +- Project [col#x]
       +- SubqueryAlias spark_catalog.default.st
          +- Relation spark_catalog.default.st[x#x,col#x] parquet
@@ -967,7 +967,7 @@ Project [col#x, pipeexpression(col#x.i1, false, SET) AS z#x]
 table t
 |> set y = (select a from other where x = a limit 1)
 -- !query analysis
-Project [x#x, pipeexpression(scalar-subquery#x [x#x], false, SET) AS y#x]
+Project [x#x, scalar-subquery#x [x#x] AS y#x]
 :  +- GlobalLimit 1
 :     +- LocalLimit 1
 :        +- Project [a#x]
@@ -983,8 +983,8 @@ table t
 |> extend 1 as `x.y.z`
 |> set `x.y.z` = x + length(y)
 -- !query analysis
-Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, SET) AS x.y.z#x]
-+- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS x.y.z#x]
+Project [x#x, y#x, (x#x + length(y#x)) AS x.y.z#x]
++- Project [x#x, y#x, 1 AS x.y.z#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -995,10 +995,10 @@ table t
 |> set z = first_value(x) over (partition by y)
 -- !query analysis
 Project [x#x, y#x, z#x]
-+- Project [x#x, y#x, _we0#x, pipeexpression(_we0#x, false, SET) AS z#x]
-   +- Window [first_value(x#x, false) windowspecdefinition(y#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS _we0#x], [y#x]
++- Project [x#x, y#x, z#x, z#x]
+   +- Window [first_value(x#x, false) windowspecdefinition(y#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS z#x], [y#x]
       +- Project [x#x, y#x]
-         +- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+         +- Project [x#x, y#x, 1 AS z#x]
             +- SubqueryAlias spark_catalog.default.t
                +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -1019,20 +1019,21 @@ Project [a#x, a#x, z2#x]
 +- GlobalLimit 2
    +- LocalLimit 2
       +- PipeOperator
-         +- Project [a#x, pipeexpression(4, false, SET) AS z2#x, a#x]
-            +- Sort [a#x ASC NULLS FIRST, a#x ASC NULLS FIRST, z2#x ASC NULLS 
FIRST], true
-               +- PipeOperator
-                  +- Filter (z2#x = 0)
-                     +- PipeOperator
-                        +- Project [a#x, z2#x, a#x]
-                           +- Project [a#x, z1#x, pipeexpression((a#x - a#x), 
false, EXTEND) AS z2#x, a#x]
-                              +- Project [a#x, pipeexpression((a#x + a#x), 
false, EXTEND) AS z1#x, a#x]
-                                 +- Project [a#x, a#x, a#x]
-                                    +- Join Inner, (a#x = a#x)
-                                       :- SubqueryAlias lhs
-                                       :  +- LocalRelation [a#x]
-                                       +- SubqueryAlias rhs
-                                          +- LocalRelation [a#x]
+         +- Project [a#x, 4 AS z2#x, a#x]
+            +- Project [a#x, z2#x, a#x]
+               +- Sort [a#x ASC NULLS FIRST, a#x ASC NULLS FIRST, z2#x ASC 
NULLS FIRST], true
+                  +- PipeOperator
+                     +- Filter (z2#x = 0)
+                        +- PipeOperator
+                           +- Project [a#x, z2#x, a#x, a#x]
+                              +- Project [a#x, z1#x, (a#x - a#x) AS z2#x, a#x, 
a#x]
+                                 +- Project [a#x, (a#x + a#x) AS z1#x, a#x, 
a#x, a#x]
+                                    +- Project [a#x, a#x, a#x, a#x, a#x]
+                                       +- Join Inner, (a#x = a#x)
+                                          :- SubqueryAlias lhs
+                                          :  +- LocalRelation [a#x]
+                                          +- SubqueryAlias rhs
+                                             +- LocalRelation [a#x]
 
 
 -- !query
@@ -1137,7 +1138,7 @@ table t
 |> drop `x.y.z`
 -- !query analysis
 Project [x#x, y#x]
-+- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS x.y.z#x]
++- Project [x#x, y#x, 1 AS x.y.z#x]
    +- SubqueryAlias spark_catalog.default.t
       +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -1707,7 +1708,7 @@ table courseSales
 Project [c#x, __pivot_sum(e) AS s AS `sum(e) AS s`#x[0] AS firstYear_s#xL, 
__pivot_avg(e) AS a AS `avg(e) AS a`#x[0] AS firstYear_a#x, __pivot_sum(e) AS s 
AS `sum(e) AS s`#x[1] AS secondYear_s#xL, __pivot_avg(e) AS a AS `avg(e) AS 
a`#x[1] AS secondYear_a#x]
 +- Aggregate [c#x], [c#x, pivotfirst(y#x, sum(e) AS s#xL, 2012, 2013, 0, 0) AS 
__pivot_sum(e) AS s AS `sum(e) AS s`#x, pivotfirst(y#x, avg(e) AS a#x, 2012, 
2013, 0, 0) AS __pivot_avg(e) AS a AS `avg(e) AS a`#x]
    +- Aggregate [c#x, y#x], [c#x, y#x, sum(e#x) AS sum(e) AS s#xL, avg(e#x) AS 
avg(e) AS a#x]
-      +- Project [pipeexpression(year#x, false, SELECT) AS y#x, 
pipeexpression(course#x, false, SELECT) AS c#x, pipeexpression(earnings#x, 
false, SELECT) AS e#x]
+      +- Project [year#x AS y#x, course#x AS c#x, earnings#x AS e#x]
          +- SubqueryAlias coursesales
             +- View (`courseSales`, [course#x, year#x, earnings#x])
                +- Project [cast(course#x as string) AS course#x, cast(year#x 
as int) AS year#x, cast(earnings#x as int) AS earnings#x]
@@ -3208,7 +3209,7 @@ org.apache.spark.sql.catalyst.parser.ParseException
 table other
 |> aggregate sum(b) as result group by a
 -- !query analysis
-Aggregate [a#x], [a#x, pipeexpression(sum(b#x), true, AGGREGATE) AS result#xL]
+Aggregate [a#x], [a#x, sum(b#x) AS result#xL]
 +- SubqueryAlias spark_catalog.default.other
    +- Relation spark_catalog.default.other[a#x,b#x] json
 
@@ -3219,7 +3220,7 @@ table other
 |> select result
 -- !query analysis
 Project [result#xL]
-+- Aggregate [a#x], [a#x, pipeexpression(sum(b#x), true, AGGREGATE) AS 
result#xL]
++- Aggregate [a#x], [a#x, sum(b#x) AS result#xL]
    +- SubqueryAlias spark_catalog.default.other
       +- Relation spark_catalog.default.other[a#x,b#x] json
 
@@ -3230,7 +3231,7 @@ table other
 |> select gkey
 -- !query analysis
 Project [gkey#x]
-+- Aggregate [(a#x + 1)], [(a#x + 1) AS gkey#x, pipeexpression(sum(b#x), true, 
AGGREGATE) AS pipeexpression(sum(b))#xL]
++- Aggregate [(a#x + 1)], [(a#x + 1) AS gkey#x, sum(b#x) AS sum(b)#xL]
    +- SubqueryAlias spark_catalog.default.other
       +- Relation spark_catalog.default.other[a#x,b#x] json
 
@@ -3257,7 +3258,7 @@ Aggregate [x#x, y#x], [x#x, y#x]
 values (3, 4) as tab(x, y)
 |> aggregate sum(y) group by 1
 -- !query analysis
-Aggregate [x#x], [x#x, pipeexpression(sum(y#x), true, AGGREGATE) AS 
pipeexpression(sum(y))#xL]
+Aggregate [x#x], [x#x, sum(y#x) AS sum(y)#xL]
 +- SubqueryAlias tab
    +- LocalRelation [x#x, y#x]
 
@@ -3266,7 +3267,7 @@ Aggregate [x#x], [x#x, pipeexpression(sum(y#x), true, 
AGGREGATE) AS pipeexpressi
 values (3, 4), (5, 4) as tab(x, y)
 |> aggregate sum(y) group by 1
 -- !query analysis
-Aggregate [x#x], [x#x, pipeexpression(sum(y#x), true, AGGREGATE) AS 
pipeexpression(sum(y))#xL]
+Aggregate [x#x], [x#x, sum(y#x) AS sum(y)#xL]
 +- SubqueryAlias tab
    +- LocalRelation [x#x, y#x]
 
@@ -3275,7 +3276,7 @@ Aggregate [x#x], [x#x, pipeexpression(sum(y#x), true, 
AGGREGATE) AS pipeexpressi
 select 3 as x, 4 as y
 |> aggregate sum(y) group by 1, 1
 -- !query analysis
-Aggregate [x#x, x#x], [x#x, x#x, pipeexpression(sum(y#x), true, AGGREGATE) AS 
pipeexpression(sum(y))#xL]
+Aggregate [x#x, x#x], [x#x, x#x, sum(y#x) AS sum(y)#xL]
 +- Project [3 AS x#x, 4 AS y#x]
    +- OneRowRelation
 
@@ -3284,7 +3285,7 @@ Aggregate [x#x, x#x], [x#x, x#x, pipeexpression(sum(y#x), 
true, AGGREGATE) AS pi
 select 1 as `1`, 2 as `2`
 |> aggregate sum(`2`) group by `1`
 -- !query analysis
-Aggregate [1#x], [1#x, pipeexpression(sum(2#x), true, AGGREGATE) AS 
pipeexpression(sum(2))#xL]
+Aggregate [1#x], [1#x, sum(2#x) AS sum(2)#xL]
 +- Project [1 AS 1#x, 2 AS 2#x]
    +- OneRowRelation
 
@@ -3293,7 +3294,7 @@ Aggregate [1#x], [1#x, pipeexpression(sum(2#x), true, 
AGGREGATE) AS pipeexpressi
 select 3 as x, 4 as y
 |> aggregate sum(y) group by 2
 -- !query analysis
-Aggregate [y#x], [y#x, pipeexpression(sum(y#x), true, AGGREGATE) AS 
pipeexpression(sum(y))#xL]
+Aggregate [y#x], [y#x, sum(y#x) AS sum(y)#xL]
 +- Project [3 AS x#x, 4 AS y#x]
    +- OneRowRelation
 
@@ -3302,7 +3303,7 @@ Aggregate [y#x], [y#x, pipeexpression(sum(y#x), true, 
AGGREGATE) AS pipeexpressi
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by 2
 -- !query analysis
-Aggregate [y#x], [y#x, pipeexpression(sum(y#x), true, AGGREGATE) AS 
pipeexpression(sum(y))#xL]
+Aggregate [y#x], [y#x, sum(y#x) AS sum(y)#xL]
 +- Project [3 AS x#x, 4 AS y#x, 5 AS z#x]
    +- OneRowRelation
 
@@ -3311,7 +3312,7 @@ Aggregate [y#x], [y#x, pipeexpression(sum(y#x), true, 
AGGREGATE) AS pipeexpressi
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by 3
 -- !query analysis
-Aggregate [z#x], [z#x, pipeexpression(sum(y#x), true, AGGREGATE) AS 
pipeexpression(sum(y))#xL]
+Aggregate [z#x], [z#x, sum(y#x) AS sum(y)#xL]
 +- Project [3 AS x#x, 4 AS y#x, 5 AS z#x]
    +- OneRowRelation
 
@@ -3320,7 +3321,7 @@ Aggregate [z#x], [z#x, pipeexpression(sum(y#x), true, 
AGGREGATE) AS pipeexpressi
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by 2, 3
 -- !query analysis
-Aggregate [y#x, z#x], [y#x, z#x, pipeexpression(sum(y#x), true, AGGREGATE) AS 
pipeexpression(sum(y))#xL]
+Aggregate [y#x, z#x], [y#x, z#x, sum(y#x) AS sum(y)#xL]
 +- Project [3 AS x#x, 4 AS y#x, 5 AS z#x]
    +- OneRowRelation
 
@@ -3329,7 +3330,7 @@ Aggregate [y#x, z#x], [y#x, z#x, pipeexpression(sum(y#x), 
true, AGGREGATE) AS pi
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by 1, 2, 3
 -- !query analysis
-Aggregate [x#x, y#x, z#x], [x#x, y#x, z#x, pipeexpression(sum(y#x), true, 
AGGREGATE) AS pipeexpression(sum(y))#xL]
+Aggregate [x#x, y#x, z#x], [x#x, y#x, z#x, sum(y#x) AS sum(y)#xL]
 +- Project [3 AS x#x, 4 AS y#x, 5 AS z#x]
    +- OneRowRelation
 
@@ -3338,7 +3339,7 @@ Aggregate [x#x, y#x, z#x], [x#x, y#x, z#x, 
pipeexpression(sum(y#x), true, AGGREG
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by x, 2, 3
 -- !query analysis
-Aggregate [x#x, y#x, z#x], [x#x, y#x, z#x, pipeexpression(sum(y#x), true, 
AGGREGATE) AS pipeexpression(sum(y))#xL]
+Aggregate [x#x, y#x, z#x], [x#x, y#x, z#x, sum(y#x) AS sum(y)#xL]
 +- Project [3 AS x#x, 4 AS y#x, 5 AS z#x]
    +- OneRowRelation
 
@@ -3347,7 +3348,7 @@ Aggregate [x#x, y#x, z#x], [x#x, y#x, z#x, 
pipeexpression(sum(y#x), true, AGGREG
 table t
 |> aggregate sum(x)
 -- !query analysis
-Aggregate [pipeexpression(sum(x#x), true, AGGREGATE) AS 
pipeexpression(sum(x))#xL]
+Aggregate [sum(x#x) AS sum(x)#xL]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -3356,7 +3357,7 @@ Aggregate [pipeexpression(sum(x#x), true, AGGREGATE) AS 
pipeexpression(sum(x))#x
 table t
 |> aggregate sum(x) + 1 as result_plus_one
 -- !query analysis
-Aggregate [pipeexpression((sum(x#x) + cast(1 as bigint)), true, AGGREGATE) AS 
result_plus_one#xL]
+Aggregate [(sum(x#x) + cast(1 as bigint)) AS result_plus_one#xL]
 +- SubqueryAlias spark_catalog.default.t
    +- Relation spark_catalog.default.t[x#x,y#x] csv
 
@@ -3406,9 +3407,9 @@ select 1 x, 2 y, 3 z
 |> aggregate avg(z) z group by x
 |> aggregate count(distinct z) c
 -- !query analysis
-Aggregate [pipeexpression(count(distinct z#x), true, AGGREGATE) AS c#xL]
-+- Aggregate [x#x], [x#x, pipeexpression(avg(z#xL), true, AGGREGATE) AS z#x]
-   +- Aggregate [x#x, y#x], [x#x, y#x, pipeexpression(sum(z#x), true, 
AGGREGATE) AS z#xL]
+Aggregate [count(distinct z#x) AS c#xL]
++- Aggregate [x#x], [x#x, avg(z#xL) AS z#x]
+   +- Aggregate [x#x, y#x], [x#x, y#x, sum(z#x) AS z#xL]
       +- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
          +- OneRowRelation
 
@@ -3419,7 +3420,7 @@ select 1 x, 3 z
 |> select x
 -- !query analysis
 Project [x#x]
-+- Aggregate [x#x, z#x, x#x], [x#x, z#x, x#x, pipeexpression(count(1), true, 
AGGREGATE) AS pipeexpression(count(1))#xL]
++- Aggregate [x#x, z#x, x#x], [x#x, z#x, x#x, count(1) AS count(1)#xL]
    +- Project [1 AS x#x, 3 AS z#x]
       +- OneRowRelation
 
@@ -3428,7 +3429,7 @@ Project [x#x]
 table other
 |> aggregate a + count(b) group by a
 -- !query analysis
-Aggregate [a#x], [a#x, pipeexpression((cast(a#x as bigint) + count(b#x)), 
true, AGGREGATE) AS pipeexpression((a + count(b)))#xL]
+Aggregate [a#x], [a#x, (cast(a#x as bigint) + count(b#x)) AS (a + count(b))#xL]
 +- SubqueryAlias spark_catalog.default.other
    +- Relation spark_catalog.default.other[a#x,b#x] json
 
@@ -3807,8 +3808,8 @@ Project [cate#x, val#x, sum_val#xL, first_value(cate) 
OVER (ORDER BY val ASC NUL
    +- Window [first_value(cate#x, false) windowspecdefinition(val#x ASC NULLS 
FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) 
AS first_value(cate) OVER (ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW)#x], [val#x ASC NULLS FIRST]
       +- Project [cate#x, val#x, sum_val#xL]
          +- Project [cate#x, val#x, sum_val#xL]
-            +- Project [cate#x, val#x, _we0#xL, pipeexpression(_we0#xL, false, 
SELECT) AS sum_val#xL]
-               +- Window [sum(val#x) windowspecdefinition(cate#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS _we0#xL], [cate#x]
+            +- Project [cate#x, val#x, sum_val#xL, sum_val#xL]
+               +- Window [sum(val#x) windowspecdefinition(cate#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS sum_val#xL], [cate#x]
                   +- Project [cate#x, val#x]
                      +- SubqueryAlias windowtestdata
                         +- View (`windowTestData`, [val#x, val_long#xL, 
val_double#x, val_date#x, val_timestamp#x, cate#x])
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 0d5ec57b9e47..fe1a263c0644 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -641,7 +641,7 @@ struct<x:int,y:string,z:int>
 table t
 |> extend 1
 -- !query schema
-struct<x:int,y:string,pipeexpression(1):int>
+struct<x:int,y:string,1:int>
 -- !query output
 0      abc     1
 1      def     1
@@ -2879,7 +2879,7 @@ struct<x:int,y:int>
 values (3, 4) as tab(x, y)
 |> aggregate sum(y) group by 1
 -- !query schema
-struct<x:int,pipeexpression(sum(y)):bigint>
+struct<x:int,sum(y):bigint>
 -- !query output
 3      4
 
@@ -2888,7 +2888,7 @@ struct<x:int,pipeexpression(sum(y)):bigint>
 values (3, 4), (5, 4) as tab(x, y)
 |> aggregate sum(y) group by 1
 -- !query schema
-struct<x:int,pipeexpression(sum(y)):bigint>
+struct<x:int,sum(y):bigint>
 -- !query output
 3      4
 5      4
@@ -2898,7 +2898,7 @@ struct<x:int,pipeexpression(sum(y)):bigint>
 select 3 as x, 4 as y
 |> aggregate sum(y) group by 1, 1
 -- !query schema
-struct<x:int,x:int,pipeexpression(sum(y)):bigint>
+struct<x:int,x:int,sum(y):bigint>
 -- !query output
 3      3       4
 
@@ -2907,7 +2907,7 @@ struct<x:int,x:int,pipeexpression(sum(y)):bigint>
 select 1 as `1`, 2 as `2`
 |> aggregate sum(`2`) group by `1`
 -- !query schema
-struct<1:int,pipeexpression(sum(2)):bigint>
+struct<1:int,sum(2):bigint>
 -- !query output
 1      2
 
@@ -2916,7 +2916,7 @@ struct<1:int,pipeexpression(sum(2)):bigint>
 select 3 as x, 4 as y
 |> aggregate sum(y) group by 2
 -- !query schema
-struct<y:int,pipeexpression(sum(y)):bigint>
+struct<y:int,sum(y):bigint>
 -- !query output
 4      4
 
@@ -2925,7 +2925,7 @@ struct<y:int,pipeexpression(sum(y)):bigint>
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by 2
 -- !query schema
-struct<y:int,pipeexpression(sum(y)):bigint>
+struct<y:int,sum(y):bigint>
 -- !query output
 4      4
 
@@ -2934,7 +2934,7 @@ struct<y:int,pipeexpression(sum(y)):bigint>
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by 3
 -- !query schema
-struct<z:int,pipeexpression(sum(y)):bigint>
+struct<z:int,sum(y):bigint>
 -- !query output
 5      4
 
@@ -2943,7 +2943,7 @@ struct<z:int,pipeexpression(sum(y)):bigint>
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by 2, 3
 -- !query schema
-struct<y:int,z:int,pipeexpression(sum(y)):bigint>
+struct<y:int,z:int,sum(y):bigint>
 -- !query output
 4      5       4
 
@@ -2952,7 +2952,7 @@ struct<y:int,z:int,pipeexpression(sum(y)):bigint>
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by 1, 2, 3
 -- !query schema
-struct<x:int,y:int,z:int,pipeexpression(sum(y)):bigint>
+struct<x:int,y:int,z:int,sum(y):bigint>
 -- !query output
 3      4       5       4
 
@@ -2961,7 +2961,7 @@ struct<x:int,y:int,z:int,pipeexpression(sum(y)):bigint>
 select 3 as x, 4 as y, 5 as z
 |> aggregate sum(y) group by x, 2, 3
 -- !query schema
-struct<x:int,y:int,z:int,pipeexpression(sum(y)):bigint>
+struct<x:int,y:int,z:int,sum(y):bigint>
 -- !query output
 3      4       5       4
 
@@ -2970,7 +2970,7 @@ struct<x:int,y:int,z:int,pipeexpression(sum(y)):bigint>
 table t
 |> aggregate sum(x)
 -- !query schema
-struct<pipeexpression(sum(x)):bigint>
+struct<sum(x):bigint>
 -- !query output
 1
 
@@ -3046,7 +3046,7 @@ struct<x:int>
 table other
 |> aggregate a + count(b) group by a
 -- !query schema
-struct<a:int,pipeexpression((a + count(b))):bigint>
+struct<a:int,(a + count(b)):bigint>
 -- !query output
 1      3
 2      3


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to