This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f86c6eabee81 [SPARK-49562][SQL] Add SQL pipe syntax for aggregation
f86c6eabee81 is described below

commit f86c6eabee819e45ffba36b325404ec102a14eba
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Tue Oct 29 16:25:10 2024 -0700

    [SPARK-49562][SQL] Add SQL pipe syntax for aggregation
    
    ### What changes were proposed in this pull request?
    
    This PR adds SQL pipe syntax support for aggregation.
    
    For example:
    
    ```
    SELECT 1 AS x, 2 AS y, 3 AS z
    |> AGGREGATE COUNT(*) GROUP BY x AS z, x + y AS z
    
    1       3       1
    ```
    
    ### Why are the changes needed?
    
    The SQL pipe operator syntax will let users compose queries in a more 
flexible fashion.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see above.
    
    ### How was this patch tested?
    
    This PR adds a few unit test cases, but mostly relies on golden file test 
coverage. I did this to make sure the answers are correct as this feature is 
implemented and also so we can look at the analyzer output plans to ensure they 
look right as well.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48529 from dtenedor/pipe-aggregate.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   5 +
 docs/sql-ref-ansi-compliance.md                    |   1 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |   1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   5 +-
 .../spark/sql/errors/QueryParsingErrors.scala      |   9 +
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  95 +++-
 .../analyzer-results/pipe-operators.sql.out        | 452 +++++++++++++++++++
 .../resources/sql-tests/inputs/pipe-operators.sql  | 147 +++++++
 .../sql-tests/results/ansi/keywords.sql.out        |   1 +
 .../resources/sql-tests/results/keywords.sql.out   |   1 +
 .../sql-tests/results/pipe-operators.sql.out       | 480 +++++++++++++++++++++
 .../spark/sql/execution/SparkSqlParserSuite.scala  |   6 +
 .../ThriftServerWithSparkContextSuite.scala        |   2 +-
 13 files changed, 1197 insertions(+), 8 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index f6ed1291457f..90348441e736 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5166,6 +5166,11 @@
           "Invalid partitioning: <cols> is missing or is in a map or array."
         ]
       },
+      "PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE" : {
+        "message" : [
+          "The SQL pipe operator syntax with aggregation (using |> AGGREGATE) 
does not support <case>."
+        ]
+      },
       "PIVOT_AFTER_GROUP_BY" : {
         "message" : [
           "PIVOT clause following a GROUP BY clause. Consider pushing the 
GROUP BY into a subquery."
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 500b41f7569a..268f5b970d30 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -405,6 +405,7 @@ Below is a list of all the keywords in Spark SQL.
 |--|----------------------|-------------------------|--------|
 |ADD|non-reserved|non-reserved|non-reserved|
 |AFTER|non-reserved|non-reserved|non-reserved|
+|AGGREGATE|non-reserved|non-reserved|non-reserved|
 |ALL|reserved|non-reserved|reserved|
 |ALTER|non-reserved|non-reserved|reserved|
 |ALWAYS|non-reserved|non-reserved|non-reserved|
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 7391e8c353de..085e723d02bc 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -120,6 +120,7 @@ BANG: '!';
 //--SPARK-KEYWORD-LIST-START
 ADD: 'ADD';
 AFTER: 'AFTER';
+AGGREGATE: 'AGGREGATE';
 ALL: 'ALL';
 ALTER: 'ALTER';
 ALWAYS: 'ALWAYS';
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 5eb4c276f39b..4900c971966c 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -765,7 +765,7 @@ temporalClause
 aggregationClause
     : GROUP BY groupingExpressionsWithGroupingAnalytics+=groupByClause
         (COMMA groupingExpressionsWithGroupingAnalytics+=groupByClause)*
-    | GROUP BY groupingExpressions+=expression (COMMA 
groupingExpressions+=expression)* (
+    | GROUP BY groupingExpressions+=namedExpression (COMMA 
groupingExpressions+=namedExpression)* (
       WITH kind=ROLLUP
     | WITH kind=CUBE
     | kind=GROUPING SETS LEFT_PAREN groupingSet (COMMA groupingSet)* 
RIGHT_PAREN)?
@@ -1513,6 +1513,7 @@ operatorPipeRightSide
     | joinRelation
     | operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? 
right=queryTerm
     | queryOrganization
+    | AGGREGATE namedExpressionSeq? aggregationClause?
     ;
 
 // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in 
Spark SQL.
@@ -1529,6 +1530,7 @@ ansiNonReserved
 //--ANSI-NON-RESERVED-START
     : ADD
     | AFTER
+    | AGGREGATE
     | ALTER
     | ALWAYS
     | ANALYZE
@@ -1851,6 +1853,7 @@ nonReserved
 //--DEFAULT-NON-RESERVED-START
     : ADD
     | AFTER
+    | AGGREGATE
     | ALL
     | ALTER
     | ALWAYS
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 0fa6eb0434ab..953eacc7ff81 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -105,6 +105,15 @@ private[sql] object QueryParsingErrors extends 
DataTypeErrorsBase {
     new ParseException(errorClass = 
"UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES", ctx)
   }
 
+  def pipeOperatorAggregateUnsupportedCaseError(
+      caseArgument: String,
+      ctx: ParserRuleContext): Throwable = {
+    new ParseException(
+      errorClass = 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+      messageParameters = Map("case" -> caseArgument),
+      ctx)
+  }
+
   def distributeByUnsupportedError(ctx: QueryOrganizationContext): Throwable = 
{
     new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0012", ctx)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index cedeca14a10d..caeb78d20e6a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -40,10 +40,10 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AnyValue, First, Las
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
 import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, 
IntervalUtils}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, 
IntervalUtils, SparkParserUtils}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, 
stringToTimestamp, stringToTimestampWithoutTimeZone}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
SupportsNamespaces, TableCatalog, TableWritePrivilege}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -1290,7 +1290,8 @@ class AstBuilder extends DataTypeAstBuilder
         withHavingClause(havingClause, Aggregate(Nil, namedExpressions, 
withFilter))
       }
     } else if (aggregationClause != null) {
-      val aggregate = withAggregationClause(aggregationClause, 
namedExpressions, withFilter)
+      val aggregate = withAggregationClause(
+        aggregationClause, namedExpressions, withFilter, 
allowNamedGroupingExpressions = false)
       aggregate.optionalMap(havingClause)(withHavingClause)
     } else {
       // When hitting this branch, `having` must be null.
@@ -1503,9 +1504,27 @@ class AstBuilder extends DataTypeAstBuilder
   private def withAggregationClause(
       ctx: AggregationClauseContext,
       selectExpressions: Seq[NamedExpression],
-      query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+      query: LogicalPlan,
+      allowNamedGroupingExpressions: Boolean): LogicalPlan = withOrigin(ctx) {
     if (ctx.groupingExpressionsWithGroupingAnalytics.isEmpty) {
-      val groupByExpressions = expressionList(ctx.groupingExpressions)
+      val groupByExpressions: Seq[Expression] =
+        ctx.groupingExpressions.asScala.map { n: NamedExpressionContext =>
+          if (!allowNamedGroupingExpressions && (n.name != null || 
n.identifierList != null)) {
+            // If we do not allow grouping expressions to have aliases in this 
context, we throw a
+            // syntax error here accordingly.
+            val error: String = (if (n.name != null) n.name else 
n.identifierList).getText
+            throw new ParseException(
+              command = Some(SparkParserUtils.command(n)),
+              start = Origin(),
+              stop = Origin(),
+              errorClass = "PARSE_SYNTAX_ERROR",
+              messageParameters = Map(
+                "error" -> s"'$error'",
+                "hint" -> s": extra input '$error'"),
+              queryContext = Array.empty)
+          }
+          visitNamedExpression(n)
+        }.toSeq
       if (ctx.GROUPING != null) {
         // GROUP BY ... GROUPING SETS (...)
         // `groupByExpressions` can be non-empty for Hive compatibility. It 
may add extra grouping
@@ -5899,7 +5918,71 @@ class AstBuilder extends DataTypeAstBuilder
       visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
     }.getOrElse(Option(ctx.queryOrganization).map { c =>
       withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
-    }.get)))))))
+    }.getOrElse(
+      visitOperatorPipeAggregate(ctx, left)
+    ))))))))
+  }
+
+  private def visitOperatorPipeAggregate(
+      ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = {
+    assert(ctx.AGGREGATE != null)
+    if (ctx.namedExpressionSeq() == null && ctx.aggregationClause() == null) {
+      operationNotAllowed(
+        "The AGGREGATE clause requires a list of aggregate expressions " +
+          "or a list of grouping expressions, or both", ctx)
+    }
+    val aggregateExpressions: Seq[NamedExpression] =
+      Option(ctx.namedExpressionSeq()).map { n: NamedExpressionSeqContext =>
+        visitNamedExpressionSeq(n).map {
+          case (e: NamedExpression, _) => e
+          case (e: Expression, aliasFunc) => UnresolvedAlias(e, aliasFunc)
+        }
+      }.getOrElse(Seq.empty)
+    Option(ctx.aggregationClause()).map { c: AggregationClauseContext =>
+      withAggregationClause(c, aggregateExpressions, left, 
allowNamedGroupingExpressions = true)
+      match {
+        case a: Aggregate =>
+          // GROUP BY ALL, GROUP BY CUBE, GROUPING_ID, GROUPING SETS, and 
GROUP BY ROLLUP are not
+          // supported yet.
+          def error(s: String): Unit =
+            throw 
QueryParsingErrors.pipeOperatorAggregateUnsupportedCaseError(s, c)
+          a.groupingExpressions match {
+            case Seq(key: UnresolvedAttribute) if key.equalsIgnoreCase("ALL") 
=>
+              error("GROUP BY ALL")
+            case _ =>
+          }
+          def visit(e: Expression): Unit = {
+            e match {
+              case _: Cube => error("GROUP BY CUBE")
+              case _: GroupingSets => error("GROUPING SETS")
+              case _: Rollup => error("GROUP BY ROLLUP")
+              case f: UnresolvedFunction if f.arguments.length == 1 && 
f.nameParts.length == 1 =>
+                Seq("GROUPING", "GROUPING_ID").foreach { name =>
+                  if (f.nameParts.head.equalsIgnoreCase(name)) error(name)
+                }
+              case _: WindowSpec => error("window functions")
+              case _ =>
+            }
+            e.children.foreach(visit)
+          }
+          a.groupingExpressions.foreach(visit)
+          a.aggregateExpressions.foreach(visit)
+          // Prepend grouping keys to the list of aggregate functions, since 
operator pipe AGGREGATE
+          // clause returns the GROUP BY expressions followed by the list of 
aggregate functions.
+          val namedGroupingExpressions: Seq[NamedExpression] =
+            a.groupingExpressions.map {
+              case n: NamedExpression => n
+              case e: Expression => UnresolvedAlias(e, None)
+            }
+          a.copy(aggregateExpressions = namedGroupingExpressions ++ 
a.aggregateExpressions)
+      }
+    }.getOrElse {
+      // This is a table aggregation with no grouping expressions.
+      Aggregate(
+        groupingExpressions = Seq.empty,
+        aggregateExpressions = aggregateExpressions,
+        child = left)
+    }
   }
 
   /**
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 30f340ca834e..6af64b116f7d 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -2317,6 +2317,458 @@ org.apache.spark.sql.catalyst.parser.ParseException
 }
 
 
+-- !query
+table other
+|> aggregate sum(b) as result group by a
+-- !query analysis
+Aggregate [a#x], [a#x, sum(b#x) AS result#xL]
++- SubqueryAlias spark_catalog.default.other
+   +- Relation spark_catalog.default.other[a#x,b#x] json
+
+
+-- !query
+table other
+|> aggregate sum(b) as result group by a
+|> select result
+-- !query analysis
+Project [result#xL]
++- Aggregate [a#x], [a#x, sum(b#x) AS result#xL]
+   +- SubqueryAlias spark_catalog.default.other
+      +- Relation spark_catalog.default.other[a#x,b#x] json
+
+
+-- !query
+table other
+|> aggregate sum(b) group by a + 1 as gkey
+|> select gkey
+-- !query analysis
+Project [gkey#x]
++- Aggregate [(a#x + 1)], [(a#x + 1) AS gkey#x, sum(b#x) AS sum(b)#xL]
+   +- SubqueryAlias spark_catalog.default.other
+      +- Relation spark_catalog.default.other[a#x,b#x] json
+
+
+-- !query
+select 1 as x, 2 as y
+|> aggregate group by x, y
+-- !query analysis
+Aggregate [x#x, y#x], [x#x, y#x]
++- Project [1 AS x#x, 2 AS y#x]
+   +- OneRowRelation
+
+
+-- !query
+select 3 as x, 4 as y
+|> aggregate group by 1, 2
+-- !query analysis
+Aggregate [1, 2], [1 AS 1#x, 2 AS 2#x]
++- Project [3 AS x#x, 4 AS y#x]
+   +- OneRowRelation
+
+
+-- !query
+table t
+|> aggregate sum(x)
+-- !query analysis
+Aggregate [sum(x#x) AS sum(x)#xL]
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> aggregate sum(x) + 1 as result_plus_one
+-- !query analysis
+Aggregate [(sum(x#x) + cast(1 as bigint)) AS result_plus_one#xL]
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table other
+|> aggregate group by a
+|> where a = 1
+-- !query analysis
+Filter (a#x = 1)
++- SubqueryAlias __auto_generated_subquery_name
+   +- Aggregate [a#x], [a#x]
+      +- SubqueryAlias spark_catalog.default.other
+         +- Relation spark_catalog.default.other[a#x,b#x] json
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> aggregate group by x, y, x + y as z
+-- !query analysis
+Aggregate [x#x, y#x, (x#x + y#x)], [x#x, y#x, (x#x + y#x) AS z#x]
++- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
+   +- OneRowRelation
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> aggregate group by x as z, x + y as z
+-- !query analysis
+Aggregate [x#x, (x#x + y#x)], [x#x AS z#x, (x#x + y#x) AS z#x]
++- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
+   +- OneRowRelation
+
+
+-- !query
+select 1 as x, 2 as y, named_struct('z', 3) as st
+|> aggregate group by x, y, x, x, st.z, (st).z, 1 + x, 2 + x
+-- !query analysis
+Aggregate [x#x, y#x, x#x, x#x, st#x.z, st#x.z, (1 + x#x), (2 + x#x)], [x#x, 
y#x, x#x, x#x, st#x.z AS z#x, st#x.z AS z#x, (1 + x#x) AS (1 + x)#x, (2 + x#x) 
AS (2 + x)#x]
++- Project [1 AS x#x, 2 AS y#x, named_struct(z, 3) AS st#x]
+   +- OneRowRelation
+
+
+-- !query
+select 1 x, 2 y, 3 z
+|> aggregate sum(z) z group by x, y
+|> aggregate avg(z) z group by x
+|> aggregate count(distinct z) c
+-- !query analysis
+Aggregate [count(distinct z#x) AS c#xL]
++- Aggregate [x#x], [x#x, avg(z#xL) AS z#x]
+   +- Aggregate [x#x, y#x], [x#x, y#x, sum(z#x) AS z#xL]
+      +- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
+         +- OneRowRelation
+
+
+-- !query
+select 1 x, 3 z
+|> aggregate count(*) group by x, z, x
+|> select x
+-- !query analysis
+Project [x#x]
++- Aggregate [x#x, z#x, x#x], [x#x, z#x, x#x, count(1) AS count(1)#xL]
+   +- Project [1 AS x#x, 3 AS z#x]
+      +- OneRowRelation
+
+
+-- !query
+table other
+|> aggregate a group by a
+-- !query analysis
+Aggregate [a#x], [a#x, a#x]
++- SubqueryAlias spark_catalog.default.other
+   +- Relation spark_catalog.default.other[a#x,b#x] json
+
+
+-- !query
+table other
+|> aggregate a + count(b) group by a
+-- !query analysis
+Aggregate [a#x], [a#x, (cast(a#x as bigint) + count(b#x)) AS (a + count(b))#xL]
++- SubqueryAlias spark_catalog.default.other
+   +- Relation spark_catalog.default.other[a#x,b#x] json
+
+
+-- !query
+select 3 as x, 4 as y
+|> aggregate group by all
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUP BY ALL"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 47,
+    "fragment" : "select 3 as x, 4 as y\n|> aggregate group by all"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings) group by rollup(course, `year`)
+|> where course = 'dotNET' and `year` = '2013'
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUP BY ROLLUP"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 123,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings) group by 
rollup(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings) group by cube(course, `year`)
+|> where course = 'dotNET' and `year` = '2013'
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUP BY CUBE"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 121,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings) group by 
cube(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings) group by course, `year` grouping sets(course, 
`year`)
+|> where course = 'dotNET' and `year` = '2013'
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUPING SETS"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 145,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings) group by 
course, `year` grouping sets(course, `year`)\n|> where course = 'dotNET' and 
`year` = '2013'"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings), grouping(course) + 1
+   group by course
+|> where course = 'dotNET' and `year` = '2013'
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUPING"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 132,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings), 
grouping(course) + 1\n   group by course\n|> where course = 'dotNET' and `year` 
= '2013'"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings), grouping_id(course)
+   group by course
+|> where course = 'dotNET' and `year` = '2013'
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUPING_ID"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 131,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings), 
grouping_id(course)\n   group by course\n|> where course = 'dotNET' and `year` 
= '2013'"
+  } ]
+}
+
+
+-- !query
+select 1 as x, 2 as y
+|> aggregate group by ()
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "')'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table other
+|> aggregate a
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "MISSING_GROUP_BY",
+  "sqlState" : "42803",
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 26,
+    "fragment" : "table other\n|> aggregate a"
+  } ]
+}
+
+
+-- !query
+table other
+|> select sum(a) as result
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "expr" : "sum(a#x)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 23,
+    "stopIndex" : 28,
+    "fragment" : "sum(a)"
+  } ]
+}
+
+
+-- !query
+table other
+|> aggregate
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0035",
+  "messageParameters" : {
+    "message" : "The AGGREGATE clause requires a list of aggregate expressions 
or a list of grouping expressions, or both"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 24,
+    "fragment" : "table other\n|> aggregate"
+  } ]
+}
+
+
+-- !query
+table other
+|> aggregate group by
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`group`",
+    "proposal" : "`a`, `b`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 26,
+    "stopIndex" : 30,
+    "fragment" : "group"
+  } ]
+}
+
+
+-- !query
+table other
+|> group by a
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'group'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table other
+|> aggregate sum(a) over () group by b
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "window functions"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 50,
+    "fragment" : "table other\n|> aggregate sum(a) over () group by b"
+  } ]
+}
+
+
+-- !query
+select 1 x, 2 y, 3 z
+|> aggregate count(*) AS c, sum(x) AS x group by x
+|> where c = 1
+|> where x = 1
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "AMBIGUOUS_REFERENCE",
+  "sqlState" : "42704",
+  "messageParameters" : {
+    "name" : "`x`",
+    "referenceNames" : "[`x`, `x`]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 97,
+    "stopIndex" : 97,
+    "fragment" : "x"
+  } ]
+}
+
+
+-- !query
+table other
+|> aggregate b group by a
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "MISSING_AGGREGATION",
+  "sqlState" : "42803",
+  "messageParameters" : {
+    "expression" : "\"b\"",
+    "expressionAnyValue" : "\"any_value(b)\""
+  }
+}
+
+
 -- !query
 drop table t
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql 
b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
index 5e0c502a77e8..8de22e65f0fb 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
@@ -674,6 +674,153 @@ table windowTestData
 table windowTestData
 |> window w as (partition by cate order by val) limit 5;
 
+-- Aggregation operators: positive tests.
+-----------------------------------------
+
+-- Basic aggregation with a GROUP BY clause. The resulting table contains all 
the attributes from
+-- the grouping keys followed by all the attributes from the aggregate 
functions, in order.
+table other
+|> aggregate sum(b) as result group by a;
+
+-- Basic aggregation with a GROUP BY clause, followed by a SELECT of just the 
aggregate function.
+-- This restricts the output attributes to just the aggregate function.
+table other
+|> aggregate sum(b) as result group by a
+|> select result;
+
+-- Basic aggregation with a GROUP BY clause, followed by a SELECT of just the 
grouping expression.
+-- This restricts the output attributes to just the grouping expression. Note 
that we must use an
+-- alias for the grouping expression to refer to it in the SELECT clause.
+table other
+|> aggregate sum(b) group by a + 1 as gkey
+|> select gkey;
+
+-- Basic aggregation on a constant table.
+select 1 as x, 2 as y
+|> aggregate group by x, y;
+
+-- Basic aggregation with group by ordinals.
+select 3 as x, 4 as y
+|> aggregate group by 1, 2;
+
+-- Basic table aggregation.
+table t
+|> aggregate sum(x);
+
+-- Basic table aggregation with an alias.
+table t
+|> aggregate sum(x) + 1 as result_plus_one;
+
+-- Grouping with no aggregate functions.
+table other
+|> aggregate group by a
+|> where a = 1;
+
+-- Group by an expression on columns, all of which are already grouped.
+select 1 as x, 2 as y, 3 as z
+|> aggregate group by x, y, x + y as z;
+
+-- Group by an expression on columns, some of which (y) aren't already grouped.
+select 1 as x, 2 as y, 3 as z
+|> aggregate group by x as z, x + y as z;
+
+-- We get an output column for each item in GROUP BY, even when they are 
duplicate expressions.
+select 1 as x, 2 as y, named_struct('z', 3) as st
+|> aggregate group by x, y, x, x, st.z, (st).z, 1 + x, 2 + x;
+
+-- Chained aggregates.
+select 1 x, 2 y, 3 z
+|> aggregate sum(z) z group by x, y
+|> aggregate avg(z) z group by x
+|> aggregate count(distinct z) c;
+
+-- Ambiguous name from duplicate GROUP BY item. This is generally allowed.
+select 1 x, 3 z
+|> aggregate count(*) group by x, z, x
+|> select x;
+
+-- Grouping expressions are allowed in the aggregate functions list if they 
appear separately in the
+-- GROUP BY clause.
+table other
+|> aggregate a group by a;
+
+-- Aggregate expressions may contain a mix of aggregate functions and grouping 
expressions.
+table other
+|> aggregate a + count(b) group by a;
+
+-- Aggregation operators: negative tests.
+-----------------------------------------
+
+-- GROUP BY ALL is not currently supported.
+select 3 as x, 4 as y
+|> aggregate group by all;
+
+-- GROUP BY ROLLUP is not supported yet.
+table courseSales
+|> aggregate sum(earnings) group by rollup(course, `year`)
+|> where course = 'dotNET' and `year` = '2013';
+
+-- GROUP BY CUBE is not supported yet.
+table courseSales
+|> aggregate sum(earnings) group by cube(course, `year`)
+|> where course = 'dotNET' and `year` = '2013';
+
+-- GROUPING SETS is not supported yet.
+table courseSales
+|> aggregate sum(earnings) group by course, `year` grouping sets(course, 
`year`)
+|> where course = 'dotNET' and `year` = '2013';
+
+-- GROUPING/GROUPING_ID is not supported yet.
+table courseSales
+|> aggregate sum(earnings), grouping(course) + 1
+   group by course
+|> where course = 'dotNET' and `year` = '2013';
+
+-- GROUPING/GROUPING_ID is not supported yet.
+table courseSales
+|> aggregate sum(earnings), grouping_id(course)
+   group by course
+|> where course = 'dotNET' and `year` = '2013';
+
+-- GROUP BY () is not valid syntax.
+select 1 as x, 2 as y
+|> aggregate group by ();
+
+-- Non-aggregate expressions are not allowed in place of aggregate functions.
+table other
+|> aggregate a;
+
+-- Using aggregate functions without the AGGREGATE keyword is not allowed.
+table other
+|> select sum(a) as result;
+
+-- The AGGREGATE keyword requires a GROUP BY clause and/or aggregation 
function(s).
+table other
+|> aggregate;
+
+-- The AGGREGATE GROUP BY list cannot be empty.
+table other
+|> aggregate group by;
+
+-- The AGGREGATE keyword is required to perform grouping.
+table other
+|> group by a;
+
+-- Window functions are not allowed in the AGGREGATE expression list.
+table other
+|> aggregate sum(a) over () group by b;
+
+-- Ambiguous name from AGGREGATE list vs GROUP BY.
+select 1 x, 2 y, 3 z
+|> aggregate count(*) AS c, sum(x) AS x group by x
+|> where c = 1
+|> where x = 1;
+
+-- Aggregate expressions may not contain references to columns or expressions 
not otherwise listed
+-- in the GROUP BY clause.
+table other
+|> aggregate b group by a;
+
 -- Cleanup.
 -----------
 drop table t;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
index d9d266e8a674..b2331ec4ab80 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
@@ -6,6 +6,7 @@ struct<keyword:string,reserved:boolean>
 -- !query output
 ADD    false
 AFTER  false
+AGGREGATE      false
 ALL    true
 ALTER  false
 ALWAYS false
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index cd93a811d64f..a88552502862 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -6,6 +6,7 @@ struct<keyword:string,reserved:boolean>
 -- !query output
 ADD    false
 AFTER  false
+AGGREGATE      false
 ALL    false
 ALTER  false
 ALWAYS false
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 64d9d38b3630..8ad2def84082 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -1962,6 +1962,486 @@ org.apache.spark.sql.catalyst.parser.ParseException
 }
 
 
+-- !query
+table other
+|> aggregate sum(b) as result group by a
+-- !query schema
+struct<a:int,result:bigint>
+-- !query output
+1      3
+2      4
+
+
+-- !query
+table other
+|> aggregate sum(b) as result group by a
+|> select result
+-- !query schema
+struct<result:bigint>
+-- !query output
+3
+4
+
+
+-- !query
+table other
+|> aggregate sum(b) group by a + 1 as gkey
+|> select gkey
+-- !query schema
+struct<gkey:int>
+-- !query output
+2
+3
+
+
+-- !query
+select 1 as x, 2 as y
+|> aggregate group by x, y
+-- !query schema
+struct<x:int,y:int>
+-- !query output
+1      2
+
+
+-- !query
+select 3 as x, 4 as y
+|> aggregate group by 1, 2
+-- !query schema
+struct<1:int,2:int>
+-- !query output
+1      2
+
+
+-- !query
+table t
+|> aggregate sum(x)
+-- !query schema
+struct<sum(x):bigint>
+-- !query output
+1
+
+
+-- !query
+table t
+|> aggregate sum(x) + 1 as result_plus_one
+-- !query schema
+struct<result_plus_one:bigint>
+-- !query output
+2
+
+
+-- !query
+table other
+|> aggregate group by a
+|> where a = 1
+-- !query schema
+struct<a:int>
+-- !query output
+1
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> aggregate group by x, y, x + y as z
+-- !query schema
+struct<x:int,y:int,z:int>
+-- !query output
+1      2       3
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> aggregate group by x as z, x + y as z
+-- !query schema
+struct<z:int,z:int>
+-- !query output
+1      3
+
+
+-- !query
+select 1 as x, 2 as y, named_struct('z', 3) as st
+|> aggregate group by x, y, x, x, st.z, (st).z, 1 + x, 2 + x
+-- !query schema
+struct<x:int,y:int,x:int,x:int,z:int,z:int,(1 + x):int,(2 + x):int>
+-- !query output
+1      2       1       1       3       3       2       3
+
+
+-- !query
+select 1 x, 2 y, 3 z
+|> aggregate sum(z) z group by x, y
+|> aggregate avg(z) z group by x
+|> aggregate count(distinct z) c
+-- !query schema
+struct<c:bigint>
+-- !query output
+1
+
+
+-- !query
+select 1 x, 3 z
+|> aggregate count(*) group by x, z, x
+|> select x
+-- !query schema
+struct<x:int>
+-- !query output
+1
+
+
+-- !query
+table other
+|> aggregate a group by a
+-- !query schema
+struct<a:int,a:int>
+-- !query output
+1      1
+2      2
+
+
+-- !query
+table other
+|> aggregate a + count(b) group by a
+-- !query schema
+struct<a:int,(a + count(b)):bigint>
+-- !query output
+1      3
+2      3
+
+
+-- !query
+select 3 as x, 4 as y
+|> aggregate group by all
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUP BY ALL"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 47,
+    "fragment" : "select 3 as x, 4 as y\n|> aggregate group by all"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings) group by rollup(course, `year`)
+|> where course = 'dotNET' and `year` = '2013'
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUP BY ROLLUP"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 123,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings) group by 
rollup(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings) group by cube(course, `year`)
+|> where course = 'dotNET' and `year` = '2013'
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUP BY CUBE"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 121,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings) group by 
cube(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings) group by course, `year` grouping sets(course, 
`year`)
+|> where course = 'dotNET' and `year` = '2013'
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUPING SETS"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 145,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings) group by 
course, `year` grouping sets(course, `year`)\n|> where course = 'dotNET' and 
`year` = '2013'"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings), grouping(course) + 1
+   group by course
+|> where course = 'dotNET' and `year` = '2013'
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUPING"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 132,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings), 
grouping(course) + 1\n   group by course\n|> where course = 'dotNET' and `year` 
= '2013'"
+  } ]
+}
+
+
+-- !query
+table courseSales
+|> aggregate sum(earnings), grouping_id(course)
+   group by course
+|> where course = 'dotNET' and `year` = '2013'
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "GROUPING_ID"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 131,
+    "fragment" : "table courseSales\n|> aggregate sum(earnings), 
grouping_id(course)\n   group by course\n|> where course = 'dotNET' and `year` 
= '2013'"
+  } ]
+}
+
+
+-- !query
+select 1 as x, 2 as y
+|> aggregate group by ()
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "')'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table other
+|> aggregate a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "MISSING_GROUP_BY",
+  "sqlState" : "42803",
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 26,
+    "fragment" : "table other\n|> aggregate a"
+  } ]
+}
+
+
+-- !query
+table other
+|> select sum(a) as result
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "expr" : "sum(a#x)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 23,
+    "stopIndex" : 28,
+    "fragment" : "sum(a)"
+  } ]
+}
+
+
+-- !query
+table other
+|> aggregate
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0035",
+  "messageParameters" : {
+    "message" : "The AGGREGATE clause requires a list of aggregate expressions 
or a list of grouping expressions, or both"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 24,
+    "fragment" : "table other\n|> aggregate"
+  } ]
+}
+
+
+-- !query
+table other
+|> aggregate group by
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`group`",
+    "proposal" : "`a`, `b`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 26,
+    "stopIndex" : 30,
+    "fragment" : "group"
+  } ]
+}
+
+
+-- !query
+table other
+|> group by a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'group'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table other
+|> aggregate sum(a) over () group by b
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "case" : "window functions"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 50,
+    "fragment" : "table other\n|> aggregate sum(a) over () group by b"
+  } ]
+}
+
+
+-- !query
+select 1 x, 2 y, 3 z
+|> aggregate count(*) AS c, sum(x) AS x group by x
+|> where c = 1
+|> where x = 1
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "AMBIGUOUS_REFERENCE",
+  "sqlState" : "42704",
+  "messageParameters" : {
+    "name" : "`x`",
+    "referenceNames" : "[`x`, `x`]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 97,
+    "stopIndex" : 97,
+    "fragment" : "x"
+  } ]
+}
+
+
+-- !query
+table other
+|> aggregate b group by a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "MISSING_AGGREGATION",
+  "sqlState" : "42803",
+  "messageParameters" : {
+    "expression" : "\"b\"",
+    "expressionAnyValue" : "\"any_value(b)\""
+  }
+}
+
+
 -- !query
 drop table t
 -- !query schema
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 644d73bca5b6..44da25678b5e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -966,6 +966,12 @@ class SparkSqlParserSuite extends AnalysisTest with 
SharedSparkSession {
       checkRepartition("TABLE t |> DISTRIBUTE BY x |> WHERE x = 1")
       checkRepartition("TABLE t |> CLUSTER BY x |> TABLESAMPLE (100 PERCENT)")
       checkRepartition("TABLE t |> SORT BY x DISTRIBUTE BY x")
+      // Aggregation
+      def checkAggregate(query: String): Unit = check(query, Seq(AGGREGATE))
+      checkAggregate("SELECT a, b FROM t |> AGGREGATE SUM(a)")
+      checkAggregate("SELECT a, b FROM t |> AGGREGATE SUM(a) AS result GROUP 
BY b")
+      checkAggregate("SELECT a, b FROM t |> AGGREGATE GROUP BY b")
+      checkAggregate("SELECT a, b FROM t |> AGGREGATE COUNT(*) AS result GROUP 
BY b")
     }
   }
 }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index 60c49619552e..71d81b06463f 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
       val sessionHandle = client.openSession(user, "")
       val infoValue = client.getInfo(sessionHandle, 
GetInfoType.CLI_ODBC_KEYWORDS)
       // scalastyle:off line.size.limit
-      assert(infoValue.getStringValue == 
"ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONSTRAINT,CONTAINS,COST,CREATE,CROSS,CUBE,CURRENT,CURREN
 [...]
+      assert(infoValue.getStringValue == 
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONSTRAINT,CONTAINS,COST,CREATE,CROSS,CUBE,CURR
 [...]
       // scalastyle:on line.size.limit
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to