This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 017b0ea71e03 [SPARK-49556][SQL] Add SQL pipe syntax for the SELECT 
operator
017b0ea71e03 is described below

commit 017b0ea71e03339336b5d199ecad4f50961e4948
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Sat Sep 14 12:16:35 2024 +0800

    [SPARK-49556][SQL] Add SQL pipe syntax for the SELECT operator
    
    ### What changes were proposed in this pull request?
    
    This PR adds SQL pipe syntax support for the SELECT operator.
    
    For example:
    
    ```
    CREATE TABLE t(x INT, y STRING) USING CSV;
    INSERT INTO t VALUES (0, 'abc'), (1, 'def');
    
    TABLE t
    |> SELECT x, y
    
    0       abc
    1       def
    
    TABLE t
    |> SELECT x, y
    |> SELECT x + LENGTH(y) AS z
    
    3
    4
    
    (SELECT * FROM t UNION ALL SELECT * FROM t)
    |> SELECT x + LENGTH(y) AS result
    
    3
    3
    4
    4
    
    TABLE t
    |> SELECT sum(x) AS result
    
    Error: aggregate functions are not allowed in the pipe operator |> SELECT 
clause; please use the |> AGGREGATE clause instead
    ```
    
    ### Why are the changes needed?
    
    The SQL pipe operator syntax will let users compose queries in a more 
flexible fashion.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see above.
    
    ### How was this patch tested?
    
    This PR adds a few unit test cases, but mostly relies on golden file test 
coverage. I did this to make sure the answers are correct as this feature is 
implemented and also so we can look at the analyzer output plans to ensure they 
look right as well.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48047 from dtenedor/pipe-select.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   6 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |   1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   5 +
 .../sql/catalyst/expressions/PipeSelect.scala      |  47 +++
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  58 +++-
 .../spark/sql/catalyst/trees/TreePatterns.scala    |   1 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |   8 +
 .../org/apache/spark/sql/internal/SQLConf.scala    |   9 +
 .../analyzer-results/pipe-operators.sql.out        | 318 +++++++++++++++++++++
 .../resources/sql-tests/inputs/pipe-operators.sql  | 102 +++++++
 .../sql-tests/results/pipe-operators.sql.out       | 308 ++++++++++++++++++++
 .../spark/sql/execution/SparkSqlParserSuite.scala  |  19 +-
 .../thriftserver/ThriftServerQueryTestSuite.scala  |   3 +-
 13 files changed, 876 insertions(+), 9 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 0a9dcd52ea83..a6d8550716b9 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3754,6 +3754,12 @@
     ],
     "sqlState" : "42K03"
   },
+  "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION" : {
+    "message" : [
+      "Aggregate function <expr> is not allowed when using the pipe operator 
|> SELECT clause; please use the pipe operator |> AGGREGATE clause instead"
+    ],
+    "sqlState" : "0A000"
+  },
   "PIVOT_VALUE_DATA_TYPE_MISMATCH" : {
     "message" : [
       "Invalid pivot value '<value>': value data type <valueType> does not 
match pivot column data type <pivotType>."
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 9ea213f3bf4a..96a58b99debe 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -506,6 +506,7 @@ TILDE: '~';
 AMPERSAND: '&';
 PIPE: '|';
 CONCAT_PIPE: '||';
+OPERATOR_PIPE: '|>';
 HAT: '^';
 COLON: ':';
 DOUBLE_COLON: '::';
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 73d5cb55295a..3ea408ca4270 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -613,6 +613,7 @@ queryTerm
         operator=INTERSECT setQuantifier? right=queryTerm                      
          #setOperation
     | left=queryTerm {!legacy_setops_precedence_enabled}?
         operator=(UNION | EXCEPT | SETMINUS) setQuantifier? right=queryTerm    
          #setOperation
+    | left=queryTerm OPERATOR_PIPE operatorPipeRightSide                       
          #operatorPipeStatement
     ;
 
 queryPrimary
@@ -1471,6 +1472,10 @@ version
     | stringLit
     ;
 
+operatorPipeRightSide
+    : selectClause
+    ;
+
 // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in 
Spark SQL.
 // - Reserved keywords:
 //     Keywords that are reserved and can't be used as identifiers for table, 
view, column,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala
new file mode 100644
index 000000000000..0b5479cc8f0e
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT, 
RUNTIME_REPLACEABLE, TreePattern}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * Represents a SELECT clause when used with the |> SQL pipe operator.
+ * We use this to make sure that no aggregate functions exist in the SELECT 
expressions.
+ */
+case class PipeSelect(child: Expression)
+  extends UnaryExpression with RuntimeReplaceable {
+  final override val nodePatterns: Seq[TreePattern] = 
Seq(PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE)
+  override def withNewChildInternal(newChild: Expression): Expression = 
PipeSelect(newChild)
+  override lazy val replacement: Expression = {
+    def visit(e: Expression): Unit = e match {
+      case a: AggregateFunction =>
+        // If we used the pipe operator |> SELECT clause to specify an 
aggregate function, this is
+        // invalid; return an error message instructing the user to use the 
pipe operator
+        // |> AGGREGATE clause for this purpose instead.
+        throw 
QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(a)
+      case _: WindowExpression =>
+        // Window functions are allowed in pipe SELECT operators, so do not 
traverse into children.
+      case _ =>
+        e.children.foreach(visit)
+    }
+    visit(child)
+    child
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 7ad7d60e70c9..edcb417da123 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -469,7 +469,8 @@ class AstBuilder extends DataTypeAstBuilder
         ctx.aggregationClause,
         ctx.havingClause,
         ctx.windowClause,
-        plan
+        plan,
+        isPipeOperatorSelect = false
       )
     }
   }
@@ -1057,7 +1058,8 @@ class AstBuilder extends DataTypeAstBuilder
       ctx.aggregationClause,
       ctx.havingClause,
       ctx.windowClause,
-      from
+      from,
+      isPipeOperatorSelect = false
     )
   }
 
@@ -1144,7 +1146,8 @@ class AstBuilder extends DataTypeAstBuilder
       aggregationClause,
       havingClause,
       windowClause,
-      isDistinct = false)
+      isDistinct = false,
+      isPipeOperatorSelect = false)
 
     ScriptTransformation(
       string(visitStringLit(transformClause.script)),
@@ -1165,6 +1168,8 @@ class AstBuilder extends DataTypeAstBuilder
    * Add a regular (SELECT) query specification to a logical plan. The query 
specification
    * is the core of the logical plan, this is where sourcing (FROM clause), 
projection (SELECT),
    * aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
+   * If 'isPipeOperatorSelect' is true, wraps each projected expression with a 
[[PipeSelect]]
+   * expression for future validation of the expressions during analysis.
    *
    * Note that query hints are ignored (both by the parser and the builder).
    */
@@ -1176,7 +1181,8 @@ class AstBuilder extends DataTypeAstBuilder
       aggregationClause: AggregationClauseContext,
       havingClause: HavingClauseContext,
       windowClause: WindowClauseContext,
-      relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+      relation: LogicalPlan,
+      isPipeOperatorSelect: Boolean): LogicalPlan = withOrigin(ctx) {
     val isDistinct = selectClause.setQuantifier() != null &&
       selectClause.setQuantifier().DISTINCT() != null
 
@@ -1188,7 +1194,8 @@ class AstBuilder extends DataTypeAstBuilder
       aggregationClause,
       havingClause,
       windowClause,
-      isDistinct)
+      isDistinct,
+      isPipeOperatorSelect)
 
     // Hint
     selectClause.hints.asScala.foldRight(plan)(withHints)
@@ -1202,7 +1209,8 @@ class AstBuilder extends DataTypeAstBuilder
       aggregationClause: AggregationClauseContext,
       havingClause: HavingClauseContext,
       windowClause: WindowClauseContext,
-      isDistinct: Boolean): LogicalPlan = {
+      isDistinct: Boolean,
+      isPipeOperatorSelect: Boolean): LogicalPlan = {
     // Add lateral views.
     val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate)
 
@@ -1216,7 +1224,20 @@ class AstBuilder extends DataTypeAstBuilder
     }
 
     def createProject() = if (namedExpressions.nonEmpty) {
-      Project(namedExpressions, withFilter)
+      val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) {
+        // If this is a pipe operator |> SELECT clause, add a [[PipeSelect]] 
expression wrapping
+        // each alias in the project list, so the analyzer can check 
invariants later.
+        namedExpressions.map {
+          case a: Alias =>
+            a.withNewChildren(Seq(PipeSelect(a.child)))
+              .asInstanceOf[NamedExpression]
+          case other =>
+            other
+        }
+      } else {
+        namedExpressions
+      }
+      Project(newProjectList, withFilter)
     } else {
       withFilter
     }
@@ -5755,6 +5776,29 @@ class AstBuilder extends DataTypeAstBuilder
       visitSetVariableImpl(ctx.query(), ctx.multipartIdentifierList(), 
ctx.assignmentList())
     }
 
+  override def visitOperatorPipeStatement(ctx: OperatorPipeStatementContext): 
LogicalPlan = {
+    visitOperatorPipeRightSide(ctx.operatorPipeRightSide(), plan(ctx.left))
+  }
+
+  private def visitOperatorPipeRightSide(
+      ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = {
+    if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
+      operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
+    }
+    Option(ctx.selectClause).map { c =>
+      withSelectQuerySpecification(
+        ctx = ctx,
+        selectClause = c,
+        lateralView = new java.util.ArrayList[LateralViewContext](),
+        whereClause = null,
+        aggregationClause = null,
+        havingClause = null,
+        windowClause = null,
+        relation = left,
+        isPipeOperatorSelect = true)
+    }.get
+  }
+
   /**
    * Check plan for any parameters.
    * If it finds any throws 
UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index cbbfccfcab5e..826ac52c2b81 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -72,6 +72,7 @@ object TreePattern extends Enumeration  {
   val NOT: Value = Value
   val NULL_CHECK: Value = Value
   val NULL_LITERAL: Value = Value
+  val PIPE_OPERATOR_SELECT: Value = Value
   val SERIALIZE_FROM_OBJECT: Value = Value
   val OR: Value = Value
   val OUTER_REFERENCE: Value = Value
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index e4c8c76e958f..f1f8be3d1575 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -4104,4 +4104,12 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("functionName" -> functionName)
     )
   }
+
+  def pipeOperatorSelectContainsAggregateFunction(expr: Expression): Throwable 
= {
+    new AnalysisException(
+      errorClass = "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+      messageParameters = Map(
+        "expr" -> expr.toString),
+      origin = expr.origin)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5853e4b66dcc..c3a42dfd62a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -5012,6 +5012,15 @@ object SQLConf {
       .stringConf
       .createWithDefault("versionAsOf")
 
+  val OPERATOR_PIPE_SYNTAX_ENABLED =
+    buildConf("spark.sql.operatorPipeSyntaxEnabled")
+      .doc("If true, enable operator pipe syntax for Apache Spark SQL. This 
uses the operator " +
+        "pipe marker |> to indicate separation between clauses of SQL in a 
manner that describes " +
+        "the sequence of steps that the query performs in a composable 
fashion.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(Utils.isTesting)
+
   val LEGACY_PERCENTILE_DISC_CALCULATION = 
buildConf("spark.sql.legacy.percentileDiscCalculation")
     .internal()
     .doc("If true, the old bogus percentile_disc calculation is used. The old 
calculation " +
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
new file mode 100644
index 000000000000..ab0635fef048
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -0,0 +1,318 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+drop table if exists t
+-- !query analysis
+DropTable true, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t
+
+
+-- !query
+create table t(x int, y string) using csv
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`t`, false
+
+
+-- !query
+insert into t values (0, 'abc'), (1, 'def')
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/t, false, CSV, [path=file:[not included in 
comparison]/{warehouse_dir}/t], Append, `spark_catalog`.`default`.`t`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/t), [x, y]
++- Project [cast(col1#x as int) AS x#x, cast(col2#x as string) AS y#x]
+   +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+drop table if exists other
+-- !query analysis
+DropTable true, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.other
+
+
+-- !query
+create table other(a int, b int) using json
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`other`, false
+
+
+-- !query
+insert into other values (1, 1), (1, 2), (2, 4)
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/other, false, JSON, [path=file:[not included in 
comparison]/{warehouse_dir}/other], Append, `spark_catalog`.`default`.`other`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/other), [a, b]
++- Project [cast(col1#x as int) AS a#x, cast(col2#x as int) AS b#x]
+   +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+drop table if exists st
+-- !query analysis
+DropTable true, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.st
+
+
+-- !query
+create table st(x int, col struct<i1:int, i2:int>) using parquet
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`st`, false
+
+
+-- !query
+insert into st values (1, (2, 3))
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/st, false, Parquet, [path=file:[not included in 
comparison]/{warehouse_dir}/st], Append, `spark_catalog`.`default`.`st`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/st), [x, col]
++- Project [cast(col1#x as int) AS x#x, named_struct(i1, cast(col2#x.col1 as 
int), i2, cast(col2#x.col2 as int)) AS col#x]
+   +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+table t
+|> select 1 as x
+-- !query analysis
+Project [pipeselect(1) AS x#x]
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> select x, y
+-- !query analysis
+Project [x#x, y#x]
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> select x, y
+|> select x + length(y) as z
+-- !query analysis
+Project [pipeselect((x#x + length(y#x))) AS z#x]
++- Project [x#x, y#x]
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+values (0), (1) tab(col)
+|> select col * 2 as result
+-- !query analysis
+Project [pipeselect((col#x * 2)) AS result#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+(select * from t union all select * from t)
+|> select x + length(y) as result
+-- !query analysis
+Project [pipeselect((x#x + length(y#x))) AS result#x]
++- Union false, false
+   :- Project [x#x, y#x]
+   :  +- SubqueryAlias spark_catalog.default.t
+   :     +- Relation spark_catalog.default.t[x#x,y#x] csv
+   +- Project [x#x, y#x]
+      +- SubqueryAlias spark_catalog.default.t
+         +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+(table t
+ |> select x, y
+ |> select x)
+union all
+select x from t where x < 1
+-- !query analysis
+Union false, false
+:- Project [x#x]
+:  +- Project [x#x, y#x]
+:     +- SubqueryAlias spark_catalog.default.t
+:        +- Relation spark_catalog.default.t[x#x,y#x] csv
++- Project [x#x]
+   +- Filter (x#x < 1)
+      +- SubqueryAlias spark_catalog.default.t
+         +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+(select col from st)
+|> select col.i1
+-- !query analysis
+Project [col#x.i1 AS i1#x]
++- Project [col#x]
+   +- SubqueryAlias spark_catalog.default.st
+      +- Relation spark_catalog.default.st[x#x,col#x] parquet
+
+
+-- !query
+table st
+|> select st.col.i1
+-- !query analysis
+Project [col#x.i1 AS i1#x]
++- SubqueryAlias spark_catalog.default.st
+   +- Relation spark_catalog.default.st[x#x,col#x] parquet
+
+
+-- !query
+table t
+|> select (select a from other where x = a limit 1) as result
+-- !query analysis
+Project [pipeselect(scalar-subquery#x [x#x]) AS result#x]
+:  +- GlobalLimit 1
+:     +- LocalLimit 1
+:        +- Project [a#x]
+:           +- Filter (outer(x#x) = a#x)
+:              +- SubqueryAlias spark_catalog.default.other
+:                 +- Relation spark_catalog.default.other[a#x,b#x] json
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+select (values (0) tab(col) |> select col) as result
+-- !query analysis
+Project [scalar-subquery#x [] AS result#x]
+:  +- Project [col#x]
+:     +- SubqueryAlias tab
+:        +- LocalRelation [col#x]
++- OneRowRelation
+
+
+-- !query
+table t
+|> select (select any_value(a) from other where x = a limit 1) as result
+-- !query analysis
+Project [pipeselect(scalar-subquery#x [x#x]) AS result#x]
+:  +- GlobalLimit 1
+:     +- LocalLimit 1
+:        +- Aggregate [any_value(a#x, false) AS any_value(a)#x]
+:           +- Filter (outer(x#x) = a#x)
+:              +- SubqueryAlias spark_catalog.default.other
+:                 +- Relation spark_catalog.default.other[a#x,b#x] json
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> select x + length(x) as z, z + 1 as plus_one
+-- !query analysis
+Project [z#x, pipeselect((z#x + 1)) AS plus_one#x]
++- Project [x#x, y#x, pipeselect((x#x + length(cast(x#x as string)))) AS z#x]
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> select first_value(x) over (partition by y) as result
+-- !query analysis
+Project [result#x]
++- Project [x#x, y#x, _we0#x, pipeselect(_we0#x) AS result#x]
+   +- Window [first_value(x#x, false) windowspecdefinition(y#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS _we0#x], [y#x]
+      +- Project [x#x, y#x]
+         +- SubqueryAlias spark_catalog.default.t
+            +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+select 1 x, 2 y, 3 z
+|> select 1 + sum(x) over (),
+     avg(y) over (),
+     x,
+     avg(x+1) over (partition by y order by z) AS a2
+|> select a2
+-- !query analysis
+Project [a2#x]
++- Project [(1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING)#x, x#x, a2#x]
+   +- Project [x#x, y#x, _w1#x, z#x, _we0#xL, avg(y) OVER (ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, _we2#x, (cast(1 as bigint) + 
_we0#xL) AS (1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING)#x, pipeselect(_we2#x) AS a2#x]
+      +- Window [avg(_w1#x) windowspecdefinition(y#x, z#x ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
_we2#x], [y#x], [z#x ASC NULLS FIRST]
+         +- Window [sum(x#x) 
windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), 
unboundedfollowing$())) AS _we0#xL, avg(y#x) 
windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), 
unboundedfollowing$())) AS avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)#x]
+            +- Project [x#x, y#x, (x#x + 1) AS _w1#x, z#x]
+               +- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
+                  +- OneRowRelation
+
+
+-- !query
+table t
+|> select x, count(*) over ()
+|> select x
+-- !query analysis
+Project [x#x]
++- Project [x#x, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING)#xL]
+   +- Project [x#x, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)#xL, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)#xL]
+      +- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, 
unboundedpreceding$(), unboundedfollowing$())) AS count(1) OVER (ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL]
+         +- Project [x#x]
+            +- SubqueryAlias spark_catalog.default.t
+               +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> select distinct x, y
+-- !query analysis
+Distinct
++- Project [x#x, y#x]
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> select sum(x) as result
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "expr" : "sum(x#x)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 19,
+    "stopIndex" : 24,
+    "fragment" : "sum(x)"
+  } ]
+}
+
+
+-- !query
+table t
+|> select y, length(y) + sum(x) as result
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "expr" : "sum(x#x)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 34,
+    "stopIndex" : 39,
+    "fragment" : "sum(x)"
+  } ]
+}
+
+
+-- !query
+drop table t
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t
+
+
+-- !query
+drop table other
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.other
+
+
+-- !query
+drop table st
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.st
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql 
b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
new file mode 100644
index 000000000000..7d0966e7f209
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
@@ -0,0 +1,102 @@
+-- Prepare some test data.
+--------------------------
+drop table if exists t;
+create table t(x int, y string) using csv;
+insert into t values (0, 'abc'), (1, 'def');
+
+drop table if exists other;
+create table other(a int, b int) using json;
+insert into other values (1, 1), (1, 2), (2, 4);
+
+drop table if exists st;
+create table st(x int, col struct<i1:int, i2:int>) using parquet;
+insert into st values (1, (2, 3));
+
+-- Selection operators: positive tests.
+---------------------------------------
+
+-- Selecting a constant.
+table t
+|> select 1 as x;
+
+-- Selecting attributes.
+table t
+|> select x, y;
+
+-- Chained pipe SELECT operators.
+table t
+|> select x, y
+|> select x + length(y) as z;
+
+-- Using the VALUES list as the source relation.
+values (0), (1) tab(col)
+|> select col * 2 as result;
+
+-- Using a table subquery as the source relation.
+(select * from t union all select * from t)
+|> select x + length(y) as result;
+
+-- Enclosing the result of a pipe SELECT operation in a table subquery.
+(table t
+ |> select x, y
+ |> select x)
+union all
+select x from t where x < 1;
+
+-- Selecting struct fields.
+(select col from st)
+|> select col.i1;
+
+table st
+|> select st.col.i1;
+
+-- Expression subqueries in the pipe operator SELECT list.
+table t
+|> select (select a from other where x = a limit 1) as result;
+
+-- Pipe operator SELECT inside expression subqueries.
+select (values (0) tab(col) |> select col) as result;
+
+-- Aggregations are allowed within expression subqueries in the pipe operator 
SELECT list as long as
+-- no aggregate functions exist in the top-level select list.
+table t
+|> select (select any_value(a) from other where x = a limit 1) as result;
+
+-- Lateral column aliases in the pipe operator SELECT list.
+table t
+|> select x + length(x) as z, z + 1 as plus_one;
+
+-- Window functions are allowed in the pipe operator SELECT list.
+table t
+|> select first_value(x) over (partition by y) as result;
+
+select 1 x, 2 y, 3 z
+|> select 1 + sum(x) over (),
+     avg(y) over (),
+     x,
+     avg(x+1) over (partition by y order by z) AS a2
+|> select a2;
+
+table t
+|> select x, count(*) over ()
+|> select x;
+
+-- DISTINCT is supported.
+table t
+|> select distinct x, y;
+
+-- Selection operators: negative tests.
+---------------------------------------
+
+-- Aggregate functions are not allowed in the pipe operator SELECT list.
+table t
+|> select sum(x) as result;
+
+table t
+|> select y, length(y) + sum(x) as result;
+
+-- Cleanup.
+-----------
+drop table t;
+drop table other;
+drop table st;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
new file mode 100644
index 000000000000..7e0b7912105c
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -0,0 +1,308 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+drop table if exists t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create table t(x int, y string) using csv
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+insert into t values (0, 'abc'), (1, 'def')
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table if exists other
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create table other(a int, b int) using json
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+insert into other values (1, 1), (1, 2), (2, 4)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table if exists st
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create table st(x int, col struct<i1:int, i2:int>) using parquet
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+insert into st values (1, (2, 3))
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+table t
+|> select 1 as x
+-- !query schema
+struct<x:int>
+-- !query output
+1
+1
+
+
+-- !query
+table t
+|> select x, y
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> select x, y
+|> select x + length(y) as z
+-- !query schema
+struct<z:int>
+-- !query output
+3
+4
+
+
+-- !query
+values (0), (1) tab(col)
+|> select col * 2 as result
+-- !query schema
+struct<result:int>
+-- !query output
+0
+2
+
+
+-- !query
+(select * from t union all select * from t)
+|> select x + length(y) as result
+-- !query schema
+struct<result:int>
+-- !query output
+3
+3
+4
+4
+
+
+-- !query
+(table t
+ |> select x, y
+ |> select x)
+union all
+select x from t where x < 1
+-- !query schema
+struct<x:int>
+-- !query output
+0
+0
+1
+
+
+-- !query
+(select col from st)
+|> select col.i1
+-- !query schema
+struct<i1:int>
+-- !query output
+2
+
+
+-- !query
+table st
+|> select st.col.i1
+-- !query schema
+struct<i1:int>
+-- !query output
+2
+
+
+-- !query
+table t
+|> select (select a from other where x = a limit 1) as result
+-- !query schema
+struct<result:int>
+-- !query output
+1
+NULL
+
+
+-- !query
+select (values (0) tab(col) |> select col) as result
+-- !query schema
+struct<result:int>
+-- !query output
+0
+
+
+-- !query
+table t
+|> select (select any_value(a) from other where x = a limit 1) as result
+-- !query schema
+struct<result:int>
+-- !query output
+1
+NULL
+
+
+-- !query
+table t
+|> select x + length(x) as z, z + 1 as plus_one
+-- !query schema
+struct<z:int,plus_one:int>
+-- !query output
+1      2
+2      3
+
+
+-- !query
+table t
+|> select first_value(x) over (partition by y) as result
+-- !query schema
+struct<result:int>
+-- !query output
+0
+1
+
+
+-- !query
+select 1 x, 2 y, 3 z
+|> select 1 + sum(x) over (),
+     avg(y) over (),
+     x,
+     avg(x+1) over (partition by y order by z) AS a2
+|> select a2
+-- !query schema
+struct<a2:double>
+-- !query output
+2.0
+
+
+-- !query
+table t
+|> select x, count(*) over ()
+|> select x
+-- !query schema
+struct<x:int>
+-- !query output
+0
+1
+
+
+-- !query
+table t
+|> select distinct x, y
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> select sum(x) as result
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "expr" : "sum(x#x)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 19,
+    "stopIndex" : 24,
+    "fragment" : "sum(x)"
+  } ]
+}
+
+
+-- !query
+table t
+|> select y, length(y) + sum(x) as result
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "expr" : "sum(x#x)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 34,
+    "stopIndex" : 39,
+    "fragment" : "sum(x)"
+  } ]
+}
+
+
+-- !query
+drop table t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table other
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table st
+-- !query schema
+struct<>
+-- !query output
+
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index decfb5555dd8..a80444feb68a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -26,10 +26,11 @@ import 
org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, Un
 import org.apache.spark.sql.catalyst.expressions.{Ascending, 
AttributeReference, Concat, GreaterThan, Literal, NullsFirst, SortOrder, 
UnresolvedWindowExpression, UnspecifiedFrame, WindowSpecDefinition, 
WindowSpecReference}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, 
PROJECT, UNRESOLVED_RELATION}
 import org.apache.spark.sql.connector.catalog.TableCatalog
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, 
RefreshResource}
-import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.util.ArrayImplicits._
@@ -880,4 +881,20 @@ class SparkSqlParserSuite extends AnalysisTest with 
SharedSparkSession {
     parser.parsePlan("SELECT\u30001") // Unicode ideographic space
   }
   // scalastyle:on
+
+  test("Operator pipe SQL syntax") {
+    withSQLConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED.key -> "true") {
+      // Basic selection.
+      // Here we check that every parsed plan contains a projection and a 
source relation or
+      // inline table.
+      def checkPipeSelect(query: String): Unit = {
+        val plan: LogicalPlan = parser.parsePlan(query)
+        assert(plan.containsPattern(PROJECT))
+        assert(plan.containsAnyPattern(UNRESOLVED_RELATION, LOCAL_RELATION))
+      }
+      checkPipeSelect("TABLE t |> SELECT 1 AS X")
+      checkPipeSelect("TABLE t |> SELECT 1 AS X, 2 AS Y |> SELECT X + Y AS Z")
+      checkPipeSelect("VALUES (0), (1) tab(col) |> SELECT col * 2 AS result")
+    }
+  }
 }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
index 026b2388c593..331572e62f56 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
@@ -103,7 +103,8 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite 
with SharedThriftServ
     // SPARK-42921
     "timestampNTZ/datetime-special-ansi.sql",
     // SPARK-47264
-    "collations.sql"
+    "collations.sql",
+    "pipe-operators.sql"
   )
 
   override def runQueries(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to