This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ee8db4ef5d00 [SPARK-50343][SPARK-50344][SQL] Add SQL pipe syntax for 
the DROP and AS operators
ee8db4ef5d00 is described below

commit ee8db4ef5d00656b3281104b996e7775cd4b84d7
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Thu Dec 5 10:36:24 2024 -0800

    [SPARK-50343][SPARK-50344][SQL] Add SQL pipe syntax for the DROP and AS 
operators
    
    ### What changes were proposed in this pull request?
    
    This PR adds SQL pipe syntax support for DROP and AS operators.
    
    The DROP operator removes one or more existing column from the input table.
    The AS operator assigns a new table alias to the preceding relation.
    
    These are equivalent to `SELECT * EXCEPT(<columnNames>)` and `SELECT ... 
FROM (...) <tableAlias>`, respectively.
    
    For example:
    
    ```
    values (0, 'pqr', 2), (3, 'tuv', 5) as tab(a, b, c)
    |> as v
    |> drop v.b
    |> drop v.c
    
    0
    3
    ```
    
    This PR also fixes a small bug and adds more testing.
    
    ### Why are the changes needed?
    
    The SQL pipe operator syntax will let users compose queries in a more 
flexible fashion.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see above.
    
    ### How was this patch tested?
    
    This PR adds a few unit test cases, but mostly relies on golden file test 
coverage. I did this to make sure the answers are correct as this feature is 
implemented and also so we can look at the analyzer output plans to ensure they 
look right as well.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49037 from dtenedor/pipe-syntax-as-drop.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |    2 +
 .../spark/sql/catalyst/parser/AstBuilder.scala     |   19 +-
 .../analyzer-results/pipe-operators.sql.out        | 1005 +++++++++++++++++++-
 .../resources/sql-tests/inputs/pipe-operators.sql  |  592 ++++++++++++
 .../sql-tests/results/pipe-operators.sql.out       |  839 +++++++++++++++-
 .../org/apache/spark/sql/SQLQueryTestSuite.scala   |   24 +-
 6 files changed, 2452 insertions(+), 29 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 93cf9974e654..26edbe15da9f 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -1510,6 +1510,8 @@ operatorPipeRightSide
     : selectClause windowClause?
     | EXTEND extendList=namedExpressionSeq
     | SET operatorPipeSetAssignmentSeq
+    | DROP identifierSeq
+    | AS errorCapturingIdentifier
     // Note that the WINDOW clause is not allowed in the WHERE pipe operator, 
but we add it here in
     // the grammar simply for purposes of catching this invalid syntax and 
throwing a specific
     // dedicated error message.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index a3fac7296dcc..882e895cc7f0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -5968,10 +5968,8 @@ class AstBuilder extends DataTypeAstBuilder
     // analyzer behave as if we had added the corresponding SQL clause after a 
table subquery
     // containing the input plan.
     def withSubqueryAlias(): LogicalPlan = left match {
-      case s: SubqueryAlias =>
-        s
-      case u: UnresolvedRelation =>
-        u
+      case _: SubqueryAlias | _: UnresolvedRelation | _: Join | _: Filter =>
+        left
       case _ =>
         SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
     }
@@ -6006,6 +6004,14 @@ class AstBuilder extends DataTypeAstBuilder
       Project(projectList, left)
     }.getOrElse(Option(ctx.SET).map { _ =>
       visitOperatorPipeSet(ctx, left)
+    }.getOrElse(Option(ctx.DROP).map { _ =>
+      val ids: Seq[String] = visitIdentifierSeq(ctx.identifierSeq())
+      val projectList: Seq[NamedExpression] =
+        Seq(UnresolvedStarExceptOrReplace(
+          target = None, excepts = ids.map(s => Seq(s)), replacements = None))
+      Project(projectList, left)
+    }.getOrElse(Option(ctx.AS).map { _ =>
+      SubqueryAlias(ctx.errorCapturingIdentifier().getText, left)
     }.getOrElse(Option(ctx.whereClause).map { c =>
       if (ctx.windowClause() != null) {
         throw 
QueryParsingErrors.windowClauseInPipeOperatorWhereClauseNotAllowedError(ctx)
@@ -6032,7 +6038,7 @@ class AstBuilder extends DataTypeAstBuilder
       withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
     }.getOrElse(
       visitOperatorPipeAggregate(ctx, left)
-    ))))))))))
+    ))))))))))))
   }
 
   private def visitOperatorPipeSet(
@@ -6119,7 +6125,8 @@ class AstBuilder extends DataTypeAstBuilder
                 Seq("GROUPING", "GROUPING_ID").foreach { name =>
                   if (f.nameParts.head.equalsIgnoreCase(name)) error(name)
                 }
-              case _: WindowSpec => error("window functions")
+              case _: WindowSpec => error("window functions; please update the 
query to move " +
+                "the window functions to a subsequent |> SELECT operator 
instead")
               case _ =>
             }
             e.children.foreach(visit)
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index a4cda92fab2e..1e1ad90946f8 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -934,6 +934,267 @@ org.apache.spark.sql.catalyst.parser.ParseException
 }
 
 
+-- !query
+table t
+|> drop y
+-- !query analysis
+Project [x#x]
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> drop z, y
+-- !query analysis
+Project [x#x]
++- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
+   +- OneRowRelation
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> drop z
+|> drop y
+-- !query analysis
+Project [x#x]
++- Project [x#x, y#x]
+   +- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
+      +- OneRowRelation
+
+
+-- !query
+select x from t
+|> drop x
+-- !query analysis
+Project
++- Project [x#x]
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend 1 as `x.y.z`
+|> drop `x.y.z`
+-- !query analysis
+Project [x#x, y#x]
++- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS x.y.z#x]
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> drop z
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`z`",
+    "proposal" : "`x`, `y`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 17,
+    "fragment" : "table t\n|> drop z"
+  } ]
+}
+
+
+-- !query
+table st
+|> drop col.i1
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'.'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table st
+|> drop `col.i1`
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`col.i1`",
+    "proposal" : "`col`, `x`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 25,
+    "fragment" : "table st\n|> drop `col.i1`"
+  } ]
+}
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> drop z, y, z
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "EXCEPT_OVERLAPPING_COLUMNS",
+  "sqlState" : "42702",
+  "messageParameters" : {
+    "columns" : "z, y, z"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 45,
+    "fragment" : "select 1 as x, 2 as y, 3 as z\n|> drop z, y, z"
+  } ]
+}
+
+
+-- !query
+table t
+|> as u
+|> select u.x, u.y
+-- !query analysis
+Project [x#x, y#x]
++- SubqueryAlias u
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+select 1 as x, 2 as y
+|> as u
+|> select u.x, u.y
+-- !query analysis
+Project [x#x, y#x]
++- SubqueryAlias u
+   +- Project [1 AS x#x, 2 AS y#x]
+      +- OneRowRelation
+
+
+-- !query
+table t
+|> as `u.v`
+|> select `u.v`.x, `u.v`.y
+-- !query analysis
+Project [x#x, y#x]
++- SubqueryAlias `u.v`
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> as u
+|> as v
+|> select v.x, v.y
+-- !query analysis
+Project [x#x, y#x]
++- SubqueryAlias v
+   +- SubqueryAlias u
+      +- SubqueryAlias spark_catalog.default.t
+         +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> as u
+|> where u.x = 1
+-- !query analysis
+Filter (x#x = 1)
++- SubqueryAlias u
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> as u, v
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "','",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table t
+|> as 1 + 2
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'1'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table t
+|> as u-v
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "INVALID_IDENTIFIER",
+  "sqlState" : "42602",
+  "messageParameters" : {
+    "ident" : "u-v"
+  }
+}
+
+
+-- !query
+table t
+|> as u@v
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'@'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table t
+|> as u#######v
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'#'",
+    "hint" : ""
+  }
+}
+
+
 -- !query
 table t
 |> where true
@@ -958,10 +1219,9 @@ table t
 |> where x + length(y) < 3
 -- !query analysis
 Filter ((x#x + length(y#x)) < 3)
-+- SubqueryAlias __auto_generated_subquery_name
-   +- Filter ((x#x + length(y#x)) < 4)
-      +- SubqueryAlias spark_catalog.default.t
-         +- Relation spark_catalog.default.t[x#x,y#x] csv
++- Filter ((x#x + length(y#x)) < 4)
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
 
 
 -- !query
@@ -2126,21 +2386,20 @@ table natural_join_test_t1
 |> where k = "one"
 -- !query analysis
 Filter (k#x = one)
-+- SubqueryAlias __auto_generated_subquery_name
-   +- Project [k#x, v1#x, v2#x]
-      +- Join Inner, (k#x = k#x)
-         :- SubqueryAlias natural_join_test_t1
-         :  +- View (`natural_join_test_t1`, [k#x, v1#x])
-         :     +- Project [cast(k#x as string) AS k#x, cast(v1#x as int) AS 
v1#x]
-         :        +- Project [k#x, v1#x]
-         :           +- SubqueryAlias natural_join_test_t1
-         :              +- LocalRelation [k#x, v1#x]
-         +- SubqueryAlias natural_join_test_t2
-            +- View (`natural_join_test_t2`, [k#x, v2#x])
-               +- Project [cast(k#x as string) AS k#x, cast(v2#x as int) AS 
v2#x]
-                  +- Project [k#x, v2#x]
-                     +- SubqueryAlias natural_join_test_t2
-                        +- LocalRelation [k#x, v2#x]
++- Project [k#x, v1#x, v2#x]
+   +- Join Inner, (k#x = k#x)
+      :- SubqueryAlias natural_join_test_t1
+      :  +- View (`natural_join_test_t1`, [k#x, v1#x])
+      :     +- Project [cast(k#x as string) AS k#x, cast(v1#x as int) AS v1#x]
+      :        +- Project [k#x, v1#x]
+      :           +- SubqueryAlias natural_join_test_t1
+      :              +- LocalRelation [k#x, v1#x]
+      +- SubqueryAlias natural_join_test_t2
+         +- View (`natural_join_test_t2`, [k#x, v2#x])
+            +- Project [cast(k#x as string) AS k#x, cast(v2#x as int) AS v2#x]
+               +- Project [k#x, v2#x]
+                  +- SubqueryAlias natural_join_test_t2
+                     +- LocalRelation [k#x, v2#x]
 
 
 -- !query
@@ -3098,7 +3357,7 @@ org.apache.spark.sql.catalyst.parser.ParseException
   "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
   "sqlState" : "0A000",
   "messageParameters" : {
-    "case" : "window functions"
+    "case" : "window functions; please update the query to move the window 
functions to a subsequent |> SELECT operator instead"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -3362,6 +3621,712 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 }
 
 
+-- !query
+with customer_total_return as
+(select
+    sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(sr_return_amt) as ctr_total_return
+  from store_returns, date_dim
+  where sr_returned_date_sk = d_date_sk and d_year = 2000
+  group by sr_customer_sk, sr_store_sk)
+select c_customer_id
+from customer_total_return ctr1, store, customer
+where ctr1.ctr_total_return >
+  (select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+  and s_store_sk = ctr1.ctr_store_sk
+  and s_state = 'tn'
+  and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`store_returns`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 161,
+    "stopIndex" : 173,
+    "fragment" : "store_returns"
+  } ]
+}
+
+
+-- !query
+with customer_total_return as
+  (table store_returns
+  |> join date_dim
+  |> where sr_returned_date_sk = d_date_sk and d_year = 2000
+  |> aggregate sum(sr_return_amt) as ctr_total_return
+       group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk)
+table customer_total_return
+|> as ctr1
+|> join store
+|> join customer
+|> where ctr1.ctr_total_return >
+     (table customer_total_return
+      |> as ctr2
+      |> where ctr1.ctr_store_sk = ctr2.ctr_store_sk
+      |> aggregate avg(ctr_total_return) * 1.2)
+     and s_store_sk = ctr1.ctr_store_sk
+     and s_state = 'tn'
+     and ctr1.ctr_customer_sk = c_customer_sk
+|> order by c_customer_id
+|> limit 100
+|> select c_customer_id
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`store_returns`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 40,
+    "stopIndex" : 52,
+    "fragment" : "store_returns"
+  } ]
+}
+
+
+-- !query
+with wscs as
+( select
+    sold_date_sk,
+    sales_price
+  from (select
+    ws_sold_date_sk sold_date_sk,
+    ws_ext_sales_price sales_price
+  from web_sales) x
+  union all
+  (select
+    cs_sold_date_sk sold_date_sk,
+    cs_ext_sales_price sales_price
+  from catalog_sales)),
+    wswscs as
+  ( select
+    d_week_seq,
+    sum(case when (d_day_name = 'sunday')
+      then sales_price
+        else null end)
+    sun_sales,
+    sum(case when (d_day_name = 'monday')
+      then sales_price
+        else null end)
+    mon_sales,
+    sum(case when (d_day_name = 'tuesday')
+      then sales_price
+        else null end)
+    tue_sales,
+    sum(case when (d_day_name = 'wednesday')
+      then sales_price
+        else null end)
+    wed_sales,
+    sum(case when (d_day_name = 'thursday')
+      then sales_price
+        else null end)
+    thu_sales,
+    sum(case when (d_day_name = 'friday')
+      then sales_price
+        else null end)
+    fri_sales,
+    sum(case when (d_day_name = 'saturday')
+      then sales_price
+        else null end)
+    sat_sales
+  from wscs, date_dim
+  where d_date_sk = sold_date_sk
+  group by d_week_seq)
+select
+  d_week_seq1,
+  round(sun_sales1 / sun_sales2, 2),
+  round(mon_sales1 / mon_sales2, 2),
+  round(tue_sales1 / tue_sales2, 2),
+  round(wed_sales1 / wed_sales2, 2),
+  round(thu_sales1 / thu_sales2, 2),
+  round(fri_sales1 / fri_sales2, 2),
+  round(sat_sales1 / sat_sales2, 2)
+from
+  (select
+    wswscs.d_week_seq d_week_seq1,
+    sun_sales sun_sales1,
+    mon_sales mon_sales1,
+    tue_sales tue_sales1,
+    wed_sales wed_sales1,
+    thu_sales thu_sales1,
+    fri_sales fri_sales1,
+    sat_sales sat_sales1
+  from wswscs, date_dim
+  where date_dim.d_week_seq = wswscs.d_week_seq and d_year = 2001) y,
+  (select
+    wswscs.d_week_seq d_week_seq2,
+    sun_sales sun_sales2,
+    mon_sales mon_sales2,
+    tue_sales tue_sales2,
+    wed_sales wed_sales2,
+    thu_sales thu_sales2,
+    fri_sales fri_sales2,
+    sat_sales sat_sales2
+  from wswscs, date_dim
+  where date_dim.d_week_seq = wswscs.d_week_seq and d_year = 2001 + 1) z
+where d_week_seq1 = d_week_seq2 - 53
+order by d_week_seq1
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`web_sales`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 148,
+    "stopIndex" : 156,
+    "fragment" : "web_sales"
+  } ]
+}
+
+
+-- !query
+with wscs as
+  (table web_sales
+  |> select
+       ws_sold_date_sk sold_date_sk,
+       ws_ext_sales_price sales_price
+  |> as x
+  |> union all (
+       table catalog_sales
+       |> select
+            cs_sold_date_sk sold_date_sk,
+            cs_ext_sales_price sales_price)
+  |> select
+       sold_date_sk,
+       sales_price),
+wswscs as
+  (table wscs
+  |> join date_dim
+  |> where d_date_sk = sold_date_sk
+  |> aggregate
+      sum(case when (d_day_name = 'sunday')
+        then sales_price
+          else null end)
+      sun_sales,
+      sum(case when (d_day_name = 'monday')
+        then sales_price
+          else null end)
+      mon_sales,
+      sum(case when (d_day_name = 'tuesday')
+        then sales_price
+          else null end)
+      tue_sales,
+      sum(case when (d_day_name = 'wednesday')
+        then sales_price
+          else null end)
+      wed_sales,
+      sum(case when (d_day_name = 'thursday')
+        then sales_price
+          else null end)
+      thu_sales,
+      sum(case when (d_day_name = 'friday')
+        then sales_price
+          else null end)
+      fri_sales,
+      sum(case when (d_day_name = 'saturday')
+        then sales_price
+          else null end)
+      sat_sales
+      group by d_week_seq)
+table wswscs
+|> join date_dim
+|> where date_dim.d_week_seq = wswscs.d_week_seq AND d_year = 2001
+|> select
+     wswscs.d_week_seq d_week_seq1,
+     sun_sales sun_sales1,
+     mon_sales mon_sales1,
+     tue_sales tue_sales1,
+     wed_sales wed_sales1,
+     thu_sales thu_sales1,
+     fri_sales fri_sales1,
+     sat_sales sat_sales1
+|> as y
+|> join (
+     table wswscs
+     |> join date_dim
+     |> where date_dim.d_week_seq = wswscs.d_week_seq AND d_year = 2001 + 1
+     |> select
+          wswscs.d_week_seq d_week_seq2,
+          sun_sales sun_sales2,
+          mon_sales mon_sales2,
+          tue_sales tue_sales2,
+          wed_sales wed_sales2,
+          thu_sales thu_sales2,
+          fri_sales fri_sales2,
+          sat_sales sat_sales2
+     |> as z)
+|> where d_week_seq1 = d_week_seq2 - 53
+|> order by d_week_seq1
+|> select
+     d_week_seq1,
+     round(sun_sales1 / sun_sales2, 2),
+     round(mon_sales1 / mon_sales2, 2),
+     round(tue_sales1 / tue_sales2, 2),
+     round(wed_sales1 / wed_sales2, 2),
+     round(thu_sales1 / thu_sales2, 2),
+     round(fri_sales1 / fri_sales2, 2),
+     round(sat_sales1 / sat_sales2, 2)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`web_sales`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 23,
+    "stopIndex" : 31,
+    "fragment" : "web_sales"
+  } ]
+}
+
+
+-- !query
+select
+  dt.d_year,
+  item.i_brand_id brand_id,
+  item.i_brand brand,
+  sum(ss_ext_sales_price) sum_agg
+from date_dim dt, store_sales, item
+where dt.d_date_sk = store_sales.ss_sold_date_sk
+  and store_sales.ss_item_sk = item.i_item_sk
+  and item.i_manufact_id = 128
+  and dt.d_moy = 11
+group by dt.d_year, item.i_brand, item.i_brand_id
+order by dt.d_year, sum_agg desc, brand_id
+limit 100
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`date_dim`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 110,
+    "stopIndex" : 117,
+    "fragment" : "date_dim"
+  } ]
+}
+
+
+-- !query
+table date_dim
+|> as dt
+|> join store_sales
+|> join item
+|> where dt.d_date_sk = store_sales.ss_sold_date_sk
+     and store_sales.ss_item_sk = item.i_item_sk
+     and item.i_manufact_id = 128
+     and dt.d_moy = 11
+|> aggregate sum(ss_ext_sales_price) sum_agg
+     group by dt.d_year d_year, item.i_brand_id brand_id, item.i_brand brand
+|> order by d_year, sum_agg desc, brand_id
+|> limit 100
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`date_dim`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 7,
+    "stopIndex" : 14,
+    "fragment" : "date_dim"
+  } ]
+}
+
+
+-- !query
+select
+  i_item_desc,
+  i_category,
+  i_class,
+  i_current_price,
+  sum(ws_ext_sales_price) as itemrevenue,
+  sum(ws_ext_sales_price) * 100 / sum(sum(ws_ext_sales_price))
+  over
+  (partition by i_class) as revenueratio
+from
+  web_sales, item, date_dim
+where
+  ws_item_sk = i_item_sk
+    and i_category in ('sports', 'books', 'home')
+    and ws_sold_date_sk = d_date_sk
+    and d_date between cast('1999-02-22' as date)
+  and (cast('1999-02-22' as date) + interval 30 days)
+group by
+  i_item_id, i_item_desc, i_category, i_class, i_current_price
+order by
+  i_category, i_class, i_item_id, i_item_desc, revenueratio
+limit 100
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`web_sales`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 227,
+    "stopIndex" : 235,
+    "fragment" : "web_sales"
+  } ]
+}
+
+
+-- !query
+table web_sales
+|> join item
+|> join date_dim
+|> where ws_item_sk = i_item_sk
+     and i_category in ('sports', 'books', 'home')
+     and ws_sold_date_sk = d_date_sk
+     and d_date between cast('1999-02-22' as date)
+     and (cast('1999-02-22' as date) + interval 30 days)
+|> aggregate sum(ws_ext_sales_price) AS itemrevenue
+     group by i_item_id, i_item_desc, i_category, i_class, i_current_price
+|> extend
+     itemrevenue * 100 / sum(itemrevenue)
+       over (partition by i_class) as revenueratio
+|> order by i_category, i_class, i_item_id, i_item_desc, revenueratio
+|> select i_item_desc, i_category, i_class, i_current_price, itemrevenue, 
revenueratio
+|> limit 100
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`web_sales`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 7,
+    "stopIndex" : 15,
+    "fragment" : "web_sales"
+  } ]
+}
+
+
+-- !query
+select
+  asceding.rnk,
+  i1.i_product_name best_performing,
+  i2.i_product_name worst_performing
+from (select *
+from (select
+  item_sk,
+  rank()
+  over (
+    order by rank_col asc) rnk
+from (select
+  ss_item_sk item_sk,
+  avg(ss_net_profit) rank_col
+from store_sales ss1
+where ss_store_sk = 4
+group by ss_item_sk
+having avg(ss_net_profit) > 0.9 * (select avg(ss_net_profit) rank_col
+from store_sales
+where ss_store_sk = 4
+  and ss_addr_sk is null
+group by ss_store_sk)) v1) v11
+where rnk < 11) asceding,
+  (select *
+  from (select
+    item_sk,
+    rank()
+    over (
+      order by rank_col desc) rnk
+  from (select
+    ss_item_sk item_sk,
+    avg(ss_net_profit) rank_col
+  from store_sales ss1
+  where ss_store_sk = 4
+  group by ss_item_sk
+  having avg(ss_net_profit) > 0.9 * (select avg(ss_net_profit) rank_col
+  from store_sales
+  where ss_store_sk = 4
+    and ss_addr_sk is null
+  group by ss_store_sk)) v2) v21
+  where rnk < 11) descending,
+  item i1, item i2
+where asceding.rnk = descending.rnk
+  and i1.i_item_sk = asceding.item_sk
+  and i2.i_item_sk = descending.item_sk
+order by asceding.rnk
+limit 100
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`store_sales`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 256,
+    "stopIndex" : 266,
+    "fragment" : "store_sales"
+  } ]
+}
+
+
+-- !query
+table store_sales
+|> as ss1
+|> where ss_store_sk = 4
+|> aggregate avg(ss_net_profit) rank_col
+     group by ss_item_sk as item_sk
+|> where rank_col > 0.9 * (
+     table store_sales
+     |> where ss_store_sk = 4
+          and ss_addr_sk is null
+     |> aggregate avg(ss_net_profit) rank_col
+          group by ss_store_sk
+     |> select rank_col)
+|> as v1
+|> select
+     item_sk,
+     rank() over (
+       order by rank_col asc) rnk
+|> as v11
+|> where rnk < 11
+|> as asceding
+|> join (
+     table store_sales
+     |> as ss1
+     |> where ss_store_sk = 4
+     |> aggregate avg(ss_net_profit) rank_col
+          group by ss_item_sk as item_sk
+     |> where rank_col > 0.9 * (
+          table store_sales
+          |> where ss_store_sk = 4
+               and ss_addr_sk is null
+          |> aggregate avg(ss_net_profit) rank_col
+               group by ss_store_sk
+          |> select rank_col)
+     |> as v2
+     |> select
+          item_sk,
+          rank() over (
+            order by rank_col asc) rnk
+     |> as v21
+     |> where rnk < 11) descending
+|> join item i1
+|> join item i2
+|> where asceding.rnk = descending.rnk
+     and i1.i_item_sk = asceding.item_sk
+     and i2.i_item_sk = descending.item_sk
+|> order by asceding.rnk
+|> select
+     asceding.rnk,
+     i1.i_product_name best_performing,
+     i2.i_product_name worst_performing
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`store_sales`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 7,
+    "stopIndex" : 17,
+    "fragment" : "store_sales"
+  } ]
+}
+
+
+-- !query
+with web_v1 as (
+  select
+    ws_item_sk item_sk,
+    d_date,
+    sum(sum(ws_sales_price))
+    over (partition by ws_item_sk
+      order by d_date
+      rows between unbounded preceding and current row) cume_sales
+  from web_sales, date_dim
+  where ws_sold_date_sk = d_date_sk
+    and d_month_seq between 1200 and 1200 + 11
+    and ws_item_sk is not null
+  group by ws_item_sk, d_date),
+    store_v1 as (
+    select
+      ss_item_sk item_sk,
+      d_date,
+      sum(sum(ss_sales_price))
+      over (partition by ss_item_sk
+        order by d_date
+        rows between unbounded preceding and current row) cume_sales
+    from store_sales, date_dim
+    where ss_sold_date_sk = d_date_sk
+      and d_month_seq between 1200 and 1200 + 11
+      and ss_item_sk is not null
+    group by ss_item_sk, d_date)
+select *
+from (select
+  item_sk,
+  d_date,
+  web_sales,
+  store_sales,
+  max(web_sales)
+  over (partition by item_sk
+    order by d_date
+    rows between unbounded preceding and current row) web_cumulative,
+  max(store_sales)
+  over (partition by item_sk
+    order by d_date
+    rows between unbounded preceding and current row) store_cumulative
+from (select
+  case when web.item_sk is not null
+    then web.item_sk
+  else store.item_sk end item_sk,
+  case when web.d_date is not null
+    then web.d_date
+  else store.d_date end d_date,
+  web.cume_sales web_sales,
+  store.cume_sales store_sales
+from web_v1 web full outer join store_v1 store on (web.item_sk = store.item_sk
+  and web.d_date = store.d_date)
+     ) x) y
+where web_cumulative > store_cumulative
+order by item_sk, d_date
+limit 100
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`web_sales`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 222,
+    "stopIndex" : 230,
+    "fragment" : "web_sales"
+  } ]
+}
+
+
+-- !query
+with web_v1 as (
+  table web_sales
+  |> join date_dim
+  |> where ws_sold_date_sk = d_date_sk
+       and d_month_seq between 1200 and 1200 + 11
+       and ws_item_sk is not null
+  |> aggregate sum(ws_sales_price) as sum_ws_sales_price
+       group by ws_item_sk as item_sk, d_date
+  |> extend sum(sum_ws_sales_price)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row)
+       as cume_sales),
+store_v1 as (
+  table store_sales
+  |> join date_dim
+  |> where ss_sold_date_sk = d_date_sk
+       and d_month_seq between 1200 and 1200 + 11
+       and ss_item_sk is not null
+  |> aggregate sum(ss_sales_price) as sum_ss_sales_price
+       group by ss_item_sk as item_sk, d_date
+  |> extend sum(sum_ss_sales_price)
+       over (partition by item_sk
+           order by d_date
+           rows between unbounded preceding and current row)
+       as cume_sales)
+table web_v1
+|> as web
+|> full outer join store_v1 store
+     on (web.item_sk = store.item_sk and web.d_date = store.d_date)
+|> select
+     case when web.item_sk is not null
+       then web.item_sk
+       else store.item_sk end item_sk,
+     case when web.d_date is not null
+       then web.d_date
+       else store.d_date end d_date,
+     web.cume_sales web_sales,
+     store.cume_sales store_sales
+|> as x
+|> select
+     item_sk,
+     d_date,
+     web_sales,
+     store_sales,
+     max(web_sales)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row) web_cumulative,
+     max(store_sales)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row) store_cumulative
+|> as y
+|> where web_cumulative > store_cumulative
+|> order by item_sk, d_date
+|> limit 100
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`web_sales`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 26,
+    "stopIndex" : 34,
+    "fragment" : "web_sales"
+  } ]
+}
+
+
 -- !query
 drop table t
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql 
b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
index 6d0e490649d6..924b42d9d305 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
@@ -320,6 +320,99 @@ table t
 select col from st
 |> set col.i1 = 42;
 
+-- DROP operators: positive tests.
+------------------------------------
+
+-- Dropping a column.
+table t
+|> drop y;
+
+-- Dropping two times.
+select 1 as x, 2 as y, 3 as z
+|> drop z, y;
+
+-- Dropping two times in sequence.
+select 1 as x, 2 as y, 3 as z
+|> drop z
+|> drop y;
+
+-- Dropping all columns in the input relation.
+select x from t
+|> drop x;
+
+-- Dropping a backquoted column name with a dot inside.
+table t
+|> extend 1 as `x.y.z`
+|> drop `x.y.z`;
+
+-- DROP operators: negative tests.
+----------------------------------
+
+-- Dropping a column that is not present in the input relation.
+table t
+|> drop z;
+
+-- Attempting to drop a struct field.
+table st
+|> drop col.i1;
+
+table st
+|> drop `col.i1`;
+
+-- Duplicate fields in the drop list.
+select 1 as x, 2 as y, 3 as z
+|> drop z, y, z;
+
+-- AS operators: positive tests.
+--------------------------------
+
+-- Renaming a table.
+table t
+|> as u
+|> select u.x, u.y;
+
+-- Renaming an input relation that is not a table.
+select 1 as x, 2 as y
+|> as u
+|> select u.x, u.y;
+
+-- Renaming as a backquoted name including a period.
+table t
+|> as `u.v`
+|> select `u.v`.x, `u.v`.y;
+
+-- Renaming two times.
+table t
+|> as u
+|> as v
+|> select v.x, v.y;
+
+-- Filtering by referring to the table or table subquery alias.
+table t
+|> as u
+|> where u.x = 1;
+
+-- AS operators: negative tests.
+--------------------------------
+
+-- Multiple aliases are not supported.
+table t
+|> as u, v;
+
+-- Expressions are not supported.
+table t
+|> as 1 + 2;
+
+-- Renaming as an invalid name.
+table t
+|> as u-v;
+
+table t
+|> as u@v;
+
+table t
+|> as u#######v;
+
 -- WHERE operators: positive tests.
 -----------------------------------
 
@@ -1040,6 +1133,505 @@ table windowTestData
 |> select cate, val, sum(val) over w as sum_val
    window w as (order by val);
 
+-- Exercise SQL compilation using a subset of TPC-DS table schemas.
+-------------------------------------------------------------------
+
+-- Q1
+with customer_total_return as
+(select
+    sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(sr_return_amt) as ctr_total_return
+  from store_returns, date_dim
+  where sr_returned_date_sk = d_date_sk and d_year = 2000
+  group by sr_customer_sk, sr_store_sk)
+select c_customer_id
+from customer_total_return ctr1, store, customer
+where ctr1.ctr_total_return >
+  (select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+  and s_store_sk = ctr1.ctr_store_sk
+  and s_state = 'tn'
+  and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+
+with customer_total_return as
+  (table store_returns
+  |> join date_dim
+  |> where sr_returned_date_sk = d_date_sk and d_year = 2000
+  |> aggregate sum(sr_return_amt) as ctr_total_return
+       group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk)
+table customer_total_return
+|> as ctr1
+|> join store
+|> join customer
+|> where ctr1.ctr_total_return >
+     (table customer_total_return
+      |> as ctr2
+      |> where ctr1.ctr_store_sk = ctr2.ctr_store_sk
+      |> aggregate avg(ctr_total_return) * 1.2)
+     and s_store_sk = ctr1.ctr_store_sk
+     and s_state = 'tn'
+     and ctr1.ctr_customer_sk = c_customer_sk
+|> order by c_customer_id
+|> limit 100
+|> select c_customer_id;
+
+-- Q2
+with wscs as
+( select
+    sold_date_sk,
+    sales_price
+  from (select
+    ws_sold_date_sk sold_date_sk,
+    ws_ext_sales_price sales_price
+  from web_sales) x
+  union all
+  (select
+    cs_sold_date_sk sold_date_sk,
+    cs_ext_sales_price sales_price
+  from catalog_sales)),
+    wswscs as
+  ( select
+    d_week_seq,
+    sum(case when (d_day_name = 'sunday')
+      then sales_price
+        else null end)
+    sun_sales,
+    sum(case when (d_day_name = 'monday')
+      then sales_price
+        else null end)
+    mon_sales,
+    sum(case when (d_day_name = 'tuesday')
+      then sales_price
+        else null end)
+    tue_sales,
+    sum(case when (d_day_name = 'wednesday')
+      then sales_price
+        else null end)
+    wed_sales,
+    sum(case when (d_day_name = 'thursday')
+      then sales_price
+        else null end)
+    thu_sales,
+    sum(case when (d_day_name = 'friday')
+      then sales_price
+        else null end)
+    fri_sales,
+    sum(case when (d_day_name = 'saturday')
+      then sales_price
+        else null end)
+    sat_sales
+  from wscs, date_dim
+  where d_date_sk = sold_date_sk
+  group by d_week_seq)
+select
+  d_week_seq1,
+  round(sun_sales1 / sun_sales2, 2),
+  round(mon_sales1 / mon_sales2, 2),
+  round(tue_sales1 / tue_sales2, 2),
+  round(wed_sales1 / wed_sales2, 2),
+  round(thu_sales1 / thu_sales2, 2),
+  round(fri_sales1 / fri_sales2, 2),
+  round(sat_sales1 / sat_sales2, 2)
+from
+  (select
+    wswscs.d_week_seq d_week_seq1,
+    sun_sales sun_sales1,
+    mon_sales mon_sales1,
+    tue_sales tue_sales1,
+    wed_sales wed_sales1,
+    thu_sales thu_sales1,
+    fri_sales fri_sales1,
+    sat_sales sat_sales1
+  from wswscs, date_dim
+  where date_dim.d_week_seq = wswscs.d_week_seq and d_year = 2001) y,
+  (select
+    wswscs.d_week_seq d_week_seq2,
+    sun_sales sun_sales2,
+    mon_sales mon_sales2,
+    tue_sales tue_sales2,
+    wed_sales wed_sales2,
+    thu_sales thu_sales2,
+    fri_sales fri_sales2,
+    sat_sales sat_sales2
+  from wswscs, date_dim
+  where date_dim.d_week_seq = wswscs.d_week_seq and d_year = 2001 + 1) z
+where d_week_seq1 = d_week_seq2 - 53
+order by d_week_seq1;
+
+with wscs as
+  (table web_sales
+  |> select
+       ws_sold_date_sk sold_date_sk,
+       ws_ext_sales_price sales_price
+  |> as x
+  |> union all (
+       table catalog_sales
+       |> select
+            cs_sold_date_sk sold_date_sk,
+            cs_ext_sales_price sales_price)
+  |> select
+       sold_date_sk,
+       sales_price),
+wswscs as
+  (table wscs
+  |> join date_dim
+  |> where d_date_sk = sold_date_sk
+  |> aggregate
+      sum(case when (d_day_name = 'sunday')
+        then sales_price
+          else null end)
+      sun_sales,
+      sum(case when (d_day_name = 'monday')
+        then sales_price
+          else null end)
+      mon_sales,
+      sum(case when (d_day_name = 'tuesday')
+        then sales_price
+          else null end)
+      tue_sales,
+      sum(case when (d_day_name = 'wednesday')
+        then sales_price
+          else null end)
+      wed_sales,
+      sum(case when (d_day_name = 'thursday')
+        then sales_price
+          else null end)
+      thu_sales,
+      sum(case when (d_day_name = 'friday')
+        then sales_price
+          else null end)
+      fri_sales,
+      sum(case when (d_day_name = 'saturday')
+        then sales_price
+          else null end)
+      sat_sales
+      group by d_week_seq)
+table wswscs
+|> join date_dim
+|> where date_dim.d_week_seq = wswscs.d_week_seq AND d_year = 2001
+|> select
+     wswscs.d_week_seq d_week_seq1,
+     sun_sales sun_sales1,
+     mon_sales mon_sales1,
+     tue_sales tue_sales1,
+     wed_sales wed_sales1,
+     thu_sales thu_sales1,
+     fri_sales fri_sales1,
+     sat_sales sat_sales1
+|> as y
+|> join (
+     table wswscs
+     |> join date_dim
+     |> where date_dim.d_week_seq = wswscs.d_week_seq AND d_year = 2001 + 1
+     |> select
+          wswscs.d_week_seq d_week_seq2,
+          sun_sales sun_sales2,
+          mon_sales mon_sales2,
+          tue_sales tue_sales2,
+          wed_sales wed_sales2,
+          thu_sales thu_sales2,
+          fri_sales fri_sales2,
+          sat_sales sat_sales2
+     |> as z)
+|> where d_week_seq1 = d_week_seq2 - 53
+|> order by d_week_seq1
+|> select
+     d_week_seq1,
+     round(sun_sales1 / sun_sales2, 2),
+     round(mon_sales1 / mon_sales2, 2),
+     round(tue_sales1 / tue_sales2, 2),
+     round(wed_sales1 / wed_sales2, 2),
+     round(thu_sales1 / thu_sales2, 2),
+     round(fri_sales1 / fri_sales2, 2),
+     round(sat_sales1 / sat_sales2, 2);
+
+-- Q3
+select
+  dt.d_year,
+  item.i_brand_id brand_id,
+  item.i_brand brand,
+  sum(ss_ext_sales_price) sum_agg
+from date_dim dt, store_sales, item
+where dt.d_date_sk = store_sales.ss_sold_date_sk
+  and store_sales.ss_item_sk = item.i_item_sk
+  and item.i_manufact_id = 128
+  and dt.d_moy = 11
+group by dt.d_year, item.i_brand, item.i_brand_id
+order by dt.d_year, sum_agg desc, brand_id
+limit 100;
+
+table date_dim
+|> as dt
+|> join store_sales
+|> join item
+|> where dt.d_date_sk = store_sales.ss_sold_date_sk
+     and store_sales.ss_item_sk = item.i_item_sk
+     and item.i_manufact_id = 128
+     and dt.d_moy = 11
+|> aggregate sum(ss_ext_sales_price) sum_agg
+     group by dt.d_year d_year, item.i_brand_id brand_id, item.i_brand brand
+|> order by d_year, sum_agg desc, brand_id
+|> limit 100;
+
+-- Q12
+select
+  i_item_desc,
+  i_category,
+  i_class,
+  i_current_price,
+  sum(ws_ext_sales_price) as itemrevenue,
+  sum(ws_ext_sales_price) * 100 / sum(sum(ws_ext_sales_price))
+  over
+  (partition by i_class) as revenueratio
+from
+  web_sales, item, date_dim
+where
+  ws_item_sk = i_item_sk
+    and i_category in ('sports', 'books', 'home')
+    and ws_sold_date_sk = d_date_sk
+    and d_date between cast('1999-02-22' as date)
+  and (cast('1999-02-22' as date) + interval 30 days)
+group by
+  i_item_id, i_item_desc, i_category, i_class, i_current_price
+order by
+  i_category, i_class, i_item_id, i_item_desc, revenueratio
+limit 100;
+
+table web_sales
+|> join item
+|> join date_dim
+|> where ws_item_sk = i_item_sk
+     and i_category in ('sports', 'books', 'home')
+     and ws_sold_date_sk = d_date_sk
+     and d_date between cast('1999-02-22' as date)
+     and (cast('1999-02-22' as date) + interval 30 days)
+|> aggregate sum(ws_ext_sales_price) AS itemrevenue
+     group by i_item_id, i_item_desc, i_category, i_class, i_current_price
+|> extend
+     itemrevenue * 100 / sum(itemrevenue)
+       over (partition by i_class) as revenueratio
+|> order by i_category, i_class, i_item_id, i_item_desc, revenueratio
+|> select i_item_desc, i_category, i_class, i_current_price, itemrevenue, 
revenueratio
+|> limit 100;
+
+-- Q44
+select
+  asceding.rnk,
+  i1.i_product_name best_performing,
+  i2.i_product_name worst_performing
+from (select *
+from (select
+  item_sk,
+  rank()
+  over (
+    order by rank_col asc) rnk
+from (select
+  ss_item_sk item_sk,
+  avg(ss_net_profit) rank_col
+from store_sales ss1
+where ss_store_sk = 4
+group by ss_item_sk
+having avg(ss_net_profit) > 0.9 * (select avg(ss_net_profit) rank_col
+from store_sales
+where ss_store_sk = 4
+  and ss_addr_sk is null
+group by ss_store_sk)) v1) v11
+where rnk < 11) asceding,
+  (select *
+  from (select
+    item_sk,
+    rank()
+    over (
+      order by rank_col desc) rnk
+  from (select
+    ss_item_sk item_sk,
+    avg(ss_net_profit) rank_col
+  from store_sales ss1
+  where ss_store_sk = 4
+  group by ss_item_sk
+  having avg(ss_net_profit) > 0.9 * (select avg(ss_net_profit) rank_col
+  from store_sales
+  where ss_store_sk = 4
+    and ss_addr_sk is null
+  group by ss_store_sk)) v2) v21
+  where rnk < 11) descending,
+  item i1, item i2
+where asceding.rnk = descending.rnk
+  and i1.i_item_sk = asceding.item_sk
+  and i2.i_item_sk = descending.item_sk
+order by asceding.rnk
+limit 100;
+
+table store_sales
+|> as ss1
+|> where ss_store_sk = 4
+|> aggregate avg(ss_net_profit) rank_col
+     group by ss_item_sk as item_sk
+|> where rank_col > 0.9 * (
+     table store_sales
+     |> where ss_store_sk = 4
+          and ss_addr_sk is null
+     |> aggregate avg(ss_net_profit) rank_col
+          group by ss_store_sk
+     |> select rank_col)
+|> as v1
+|> select
+     item_sk,
+     rank() over (
+       order by rank_col asc) rnk
+|> as v11
+|> where rnk < 11
+|> as asceding
+|> join (
+     table store_sales
+     |> as ss1
+     |> where ss_store_sk = 4
+     |> aggregate avg(ss_net_profit) rank_col
+          group by ss_item_sk as item_sk
+     |> where rank_col > 0.9 * (
+          table store_sales
+          |> where ss_store_sk = 4
+               and ss_addr_sk is null
+          |> aggregate avg(ss_net_profit) rank_col
+               group by ss_store_sk
+          |> select rank_col)
+     |> as v2
+     |> select
+          item_sk,
+          rank() over (
+            order by rank_col asc) rnk
+     |> as v21
+     |> where rnk < 11) descending
+|> join item i1
+|> join item i2
+|> where asceding.rnk = descending.rnk
+     and i1.i_item_sk = asceding.item_sk
+     and i2.i_item_sk = descending.item_sk
+|> order by asceding.rnk
+|> select
+     asceding.rnk,
+     i1.i_product_name best_performing,
+     i2.i_product_name worst_performing;
+
+-- Q51
+with web_v1 as (
+  select
+    ws_item_sk item_sk,
+    d_date,
+    sum(sum(ws_sales_price))
+    over (partition by ws_item_sk
+      order by d_date
+      rows between unbounded preceding and current row) cume_sales
+  from web_sales, date_dim
+  where ws_sold_date_sk = d_date_sk
+    and d_month_seq between 1200 and 1200 + 11
+    and ws_item_sk is not null
+  group by ws_item_sk, d_date),
+    store_v1 as (
+    select
+      ss_item_sk item_sk,
+      d_date,
+      sum(sum(ss_sales_price))
+      over (partition by ss_item_sk
+        order by d_date
+        rows between unbounded preceding and current row) cume_sales
+    from store_sales, date_dim
+    where ss_sold_date_sk = d_date_sk
+      and d_month_seq between 1200 and 1200 + 11
+      and ss_item_sk is not null
+    group by ss_item_sk, d_date)
+select *
+from (select
+  item_sk,
+  d_date,
+  web_sales,
+  store_sales,
+  max(web_sales)
+  over (partition by item_sk
+    order by d_date
+    rows between unbounded preceding and current row) web_cumulative,
+  max(store_sales)
+  over (partition by item_sk
+    order by d_date
+    rows between unbounded preceding and current row) store_cumulative
+from (select
+  case when web.item_sk is not null
+    then web.item_sk
+  else store.item_sk end item_sk,
+  case when web.d_date is not null
+    then web.d_date
+  else store.d_date end d_date,
+  web.cume_sales web_sales,
+  store.cume_sales store_sales
+from web_v1 web full outer join store_v1 store on (web.item_sk = store.item_sk
+  and web.d_date = store.d_date)
+     ) x) y
+where web_cumulative > store_cumulative
+order by item_sk, d_date
+limit 100;
+
+with web_v1 as (
+  table web_sales
+  |> join date_dim
+  |> where ws_sold_date_sk = d_date_sk
+       and d_month_seq between 1200 and 1200 + 11
+       and ws_item_sk is not null
+  |> aggregate sum(ws_sales_price) as sum_ws_sales_price
+       group by ws_item_sk as item_sk, d_date
+  |> extend sum(sum_ws_sales_price)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row)
+       as cume_sales),
+store_v1 as (
+  table store_sales
+  |> join date_dim
+  |> where ss_sold_date_sk = d_date_sk
+       and d_month_seq between 1200 and 1200 + 11
+       and ss_item_sk is not null
+  |> aggregate sum(ss_sales_price) as sum_ss_sales_price
+       group by ss_item_sk as item_sk, d_date
+  |> extend sum(sum_ss_sales_price)
+       over (partition by item_sk
+           order by d_date
+           rows between unbounded preceding and current row)
+       as cume_sales)
+table web_v1
+|> as web
+|> full outer join store_v1 store
+     on (web.item_sk = store.item_sk and web.d_date = store.d_date)
+|> select
+     case when web.item_sk is not null
+       then web.item_sk
+       else store.item_sk end item_sk,
+     case when web.d_date is not null
+       then web.d_date
+       else store.d_date end d_date,
+     web.cume_sales web_sales,
+     store.cume_sales store_sales
+|> as x
+|> select
+     item_sk,
+     d_date,
+     web_sales,
+     store_sales,
+     max(web_sales)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row) web_cumulative,
+     max(store_sales)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row) store_cumulative
+|> as y
+|> where web_cumulative > store_cumulative
+|> order by item_sk, d_date
+|> limit 100;
+
 -- Cleanup.
 -----------
 drop table t;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 85a411f60fe2..570b61f388ea 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -873,6 +873,281 @@ org.apache.spark.sql.catalyst.parser.ParseException
 }
 
 
+-- !query
+table t
+|> drop y
+-- !query schema
+struct<x:int>
+-- !query output
+0
+1
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> drop z, y
+-- !query schema
+struct<x:int>
+-- !query output
+1
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> drop z
+|> drop y
+-- !query schema
+struct<x:int>
+-- !query output
+1
+
+
+-- !query
+select x from t
+|> drop x
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+table t
+|> extend 1 as `x.y.z`
+|> drop `x.y.z`
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> drop z
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`z`",
+    "proposal" : "`x`, `y`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 17,
+    "fragment" : "table t\n|> drop z"
+  } ]
+}
+
+
+-- !query
+table st
+|> drop col.i1
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'.'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table st
+|> drop `col.i1`
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`col.i1`",
+    "proposal" : "`col`, `x`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 25,
+    "fragment" : "table st\n|> drop `col.i1`"
+  } ]
+}
+
+
+-- !query
+select 1 as x, 2 as y, 3 as z
+|> drop z, y, z
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "EXCEPT_OVERLAPPING_COLUMNS",
+  "sqlState" : "42702",
+  "messageParameters" : {
+    "columns" : "z, y, z"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 45,
+    "fragment" : "select 1 as x, 2 as y, 3 as z\n|> drop z, y, z"
+  } ]
+}
+
+
+-- !query
+table t
+|> as u
+|> select u.x, u.y
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+select 1 as x, 2 as y
+|> as u
+|> select u.x, u.y
+-- !query schema
+struct<x:int,y:int>
+-- !query output
+1      2
+
+
+-- !query
+table t
+|> as `u.v`
+|> select `u.v`.x, `u.v`.y
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> as u
+|> as v
+|> select v.x, v.y
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> as u
+|> where u.x = 1
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+1      def
+
+
+-- !query
+table t
+|> as u, v
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "','",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table t
+|> as 1 + 2
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'1'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table t
+|> as u-v
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "INVALID_IDENTIFIER",
+  "sqlState" : "42602",
+  "messageParameters" : {
+    "ident" : "u-v"
+  }
+}
+
+
+-- !query
+table t
+|> as u@v
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'@'",
+    "hint" : ""
+  }
+}
+
+
+-- !query
+table t
+|> as u#######v
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'#'",
+    "hint" : ""
+  }
+}
+
+
 -- !query
 table t
 |> where true
@@ -2771,7 +3046,7 @@ org.apache.spark.sql.catalyst.parser.ParseException
   "errorClass" : 
"UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
   "sqlState" : "0A000",
   "messageParameters" : {
-    "case" : "window functions"
+    "case" : "window functions; please update the query to move the window 
functions to a subsequent |> SELECT operator instead"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -3041,6 +3316,568 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 }
 
 
+-- !query
+with customer_total_return as
+(select
+    sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(sr_return_amt) as ctr_total_return
+  from store_returns, date_dim
+  where sr_returned_date_sk = d_date_sk and d_year = 2000
+  group by sr_customer_sk, sr_store_sk)
+select c_customer_id
+from customer_total_return ctr1, store, customer
+where ctr1.ctr_total_return >
+  (select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+  and s_store_sk = ctr1.ctr_store_sk
+  and s_state = 'tn'
+  and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100
+-- !query schema
+struct<c_customer_id:string>
+-- !query output
+
+
+
+-- !query
+with customer_total_return as
+  (table store_returns
+  |> join date_dim
+  |> where sr_returned_date_sk = d_date_sk and d_year = 2000
+  |> aggregate sum(sr_return_amt) as ctr_total_return
+       group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk)
+table customer_total_return
+|> as ctr1
+|> join store
+|> join customer
+|> where ctr1.ctr_total_return >
+     (table customer_total_return
+      |> as ctr2
+      |> where ctr1.ctr_store_sk = ctr2.ctr_store_sk
+      |> aggregate avg(ctr_total_return) * 1.2)
+     and s_store_sk = ctr1.ctr_store_sk
+     and s_state = 'tn'
+     and ctr1.ctr_customer_sk = c_customer_sk
+|> order by c_customer_id
+|> limit 100
+|> select c_customer_id
+-- !query schema
+struct<c_customer_id:string>
+-- !query output
+
+
+
+-- !query
+with wscs as
+( select
+    sold_date_sk,
+    sales_price
+  from (select
+    ws_sold_date_sk sold_date_sk,
+    ws_ext_sales_price sales_price
+  from web_sales) x
+  union all
+  (select
+    cs_sold_date_sk sold_date_sk,
+    cs_ext_sales_price sales_price
+  from catalog_sales)),
+    wswscs as
+  ( select
+    d_week_seq,
+    sum(case when (d_day_name = 'sunday')
+      then sales_price
+        else null end)
+    sun_sales,
+    sum(case when (d_day_name = 'monday')
+      then sales_price
+        else null end)
+    mon_sales,
+    sum(case when (d_day_name = 'tuesday')
+      then sales_price
+        else null end)
+    tue_sales,
+    sum(case when (d_day_name = 'wednesday')
+      then sales_price
+        else null end)
+    wed_sales,
+    sum(case when (d_day_name = 'thursday')
+      then sales_price
+        else null end)
+    thu_sales,
+    sum(case when (d_day_name = 'friday')
+      then sales_price
+        else null end)
+    fri_sales,
+    sum(case when (d_day_name = 'saturday')
+      then sales_price
+        else null end)
+    sat_sales
+  from wscs, date_dim
+  where d_date_sk = sold_date_sk
+  group by d_week_seq)
+select
+  d_week_seq1,
+  round(sun_sales1 / sun_sales2, 2),
+  round(mon_sales1 / mon_sales2, 2),
+  round(tue_sales1 / tue_sales2, 2),
+  round(wed_sales1 / wed_sales2, 2),
+  round(thu_sales1 / thu_sales2, 2),
+  round(fri_sales1 / fri_sales2, 2),
+  round(sat_sales1 / sat_sales2, 2)
+from
+  (select
+    wswscs.d_week_seq d_week_seq1,
+    sun_sales sun_sales1,
+    mon_sales mon_sales1,
+    tue_sales tue_sales1,
+    wed_sales wed_sales1,
+    thu_sales thu_sales1,
+    fri_sales fri_sales1,
+    sat_sales sat_sales1
+  from wswscs, date_dim
+  where date_dim.d_week_seq = wswscs.d_week_seq and d_year = 2001) y,
+  (select
+    wswscs.d_week_seq d_week_seq2,
+    sun_sales sun_sales2,
+    mon_sales mon_sales2,
+    tue_sales tue_sales2,
+    wed_sales wed_sales2,
+    thu_sales thu_sales2,
+    fri_sales fri_sales2,
+    sat_sales sat_sales2
+  from wswscs, date_dim
+  where date_dim.d_week_seq = wswscs.d_week_seq and d_year = 2001 + 1) z
+where d_week_seq1 = d_week_seq2 - 53
+order by d_week_seq1
+-- !query schema
+struct<d_week_seq1:int,round((sun_sales1 / sun_sales2), 
2):decimal(20,2),round((mon_sales1 / mon_sales2), 
2):decimal(20,2),round((tue_sales1 / tue_sales2), 
2):decimal(20,2),round((wed_sales1 / wed_sales2), 
2):decimal(20,2),round((thu_sales1 / thu_sales2), 
2):decimal(20,2),round((fri_sales1 / fri_sales2), 
2):decimal(20,2),round((sat_sales1 / sat_sales2), 2):decimal(20,2)>
+-- !query output
+
+
+
+-- !query
+with wscs as
+  (table web_sales
+  |> select
+       ws_sold_date_sk sold_date_sk,
+       ws_ext_sales_price sales_price
+  |> as x
+  |> union all (
+       table catalog_sales
+       |> select
+            cs_sold_date_sk sold_date_sk,
+            cs_ext_sales_price sales_price)
+  |> select
+       sold_date_sk,
+       sales_price),
+wswscs as
+  (table wscs
+  |> join date_dim
+  |> where d_date_sk = sold_date_sk
+  |> aggregate
+      sum(case when (d_day_name = 'sunday')
+        then sales_price
+          else null end)
+      sun_sales,
+      sum(case when (d_day_name = 'monday')
+        then sales_price
+          else null end)
+      mon_sales,
+      sum(case when (d_day_name = 'tuesday')
+        then sales_price
+          else null end)
+      tue_sales,
+      sum(case when (d_day_name = 'wednesday')
+        then sales_price
+          else null end)
+      wed_sales,
+      sum(case when (d_day_name = 'thursday')
+        then sales_price
+          else null end)
+      thu_sales,
+      sum(case when (d_day_name = 'friday')
+        then sales_price
+          else null end)
+      fri_sales,
+      sum(case when (d_day_name = 'saturday')
+        then sales_price
+          else null end)
+      sat_sales
+      group by d_week_seq)
+table wswscs
+|> join date_dim
+|> where date_dim.d_week_seq = wswscs.d_week_seq AND d_year = 2001
+|> select
+     wswscs.d_week_seq d_week_seq1,
+     sun_sales sun_sales1,
+     mon_sales mon_sales1,
+     tue_sales tue_sales1,
+     wed_sales wed_sales1,
+     thu_sales thu_sales1,
+     fri_sales fri_sales1,
+     sat_sales sat_sales1
+|> as y
+|> join (
+     table wswscs
+     |> join date_dim
+     |> where date_dim.d_week_seq = wswscs.d_week_seq AND d_year = 2001 + 1
+     |> select
+          wswscs.d_week_seq d_week_seq2,
+          sun_sales sun_sales2,
+          mon_sales mon_sales2,
+          tue_sales tue_sales2,
+          wed_sales wed_sales2,
+          thu_sales thu_sales2,
+          fri_sales fri_sales2,
+          sat_sales sat_sales2
+     |> as z)
+|> where d_week_seq1 = d_week_seq2 - 53
+|> order by d_week_seq1
+|> select
+     d_week_seq1,
+     round(sun_sales1 / sun_sales2, 2),
+     round(mon_sales1 / mon_sales2, 2),
+     round(tue_sales1 / tue_sales2, 2),
+     round(wed_sales1 / wed_sales2, 2),
+     round(thu_sales1 / thu_sales2, 2),
+     round(fri_sales1 / fri_sales2, 2),
+     round(sat_sales1 / sat_sales2, 2)
+-- !query schema
+struct<d_week_seq1:int,round((sun_sales1 / sun_sales2), 
2):decimal(20,2),round((mon_sales1 / mon_sales2), 
2):decimal(20,2),round((tue_sales1 / tue_sales2), 
2):decimal(20,2),round((wed_sales1 / wed_sales2), 
2):decimal(20,2),round((thu_sales1 / thu_sales2), 
2):decimal(20,2),round((fri_sales1 / fri_sales2), 
2):decimal(20,2),round((sat_sales1 / sat_sales2), 2):decimal(20,2)>
+-- !query output
+
+
+
+-- !query
+select
+  dt.d_year,
+  item.i_brand_id brand_id,
+  item.i_brand brand,
+  sum(ss_ext_sales_price) sum_agg
+from date_dim dt, store_sales, item
+where dt.d_date_sk = store_sales.ss_sold_date_sk
+  and store_sales.ss_item_sk = item.i_item_sk
+  and item.i_manufact_id = 128
+  and dt.d_moy = 11
+group by dt.d_year, item.i_brand, item.i_brand_id
+order by dt.d_year, sum_agg desc, brand_id
+limit 100
+-- !query schema
+struct<d_year:int,brand_id:int,brand:string,sum_agg:decimal(17,2)>
+-- !query output
+
+
+
+-- !query
+table date_dim
+|> as dt
+|> join store_sales
+|> join item
+|> where dt.d_date_sk = store_sales.ss_sold_date_sk
+     and store_sales.ss_item_sk = item.i_item_sk
+     and item.i_manufact_id = 128
+     and dt.d_moy = 11
+|> aggregate sum(ss_ext_sales_price) sum_agg
+     group by dt.d_year d_year, item.i_brand_id brand_id, item.i_brand brand
+|> order by d_year, sum_agg desc, brand_id
+|> limit 100
+-- !query schema
+struct<d_year:int,brand_id:int,brand:string,sum_agg:decimal(17,2)>
+-- !query output
+
+
+
+-- !query
+select
+  i_item_desc,
+  i_category,
+  i_class,
+  i_current_price,
+  sum(ws_ext_sales_price) as itemrevenue,
+  sum(ws_ext_sales_price) * 100 / sum(sum(ws_ext_sales_price))
+  over
+  (partition by i_class) as revenueratio
+from
+  web_sales, item, date_dim
+where
+  ws_item_sk = i_item_sk
+    and i_category in ('sports', 'books', 'home')
+    and ws_sold_date_sk = d_date_sk
+    and d_date between cast('1999-02-22' as date)
+  and (cast('1999-02-22' as date) + interval 30 days)
+group by
+  i_item_id, i_item_desc, i_category, i_class, i_current_price
+order by
+  i_category, i_class, i_item_id, i_item_desc, revenueratio
+limit 100
+-- !query schema
+struct<i_item_desc:string,i_category:string,i_class:string,i_current_price:decimal(7,2),itemrevenue:decimal(17,2),revenueratio:decimal(38,17)>
+-- !query output
+
+
+
+-- !query
+table web_sales
+|> join item
+|> join date_dim
+|> where ws_item_sk = i_item_sk
+     and i_category in ('sports', 'books', 'home')
+     and ws_sold_date_sk = d_date_sk
+     and d_date between cast('1999-02-22' as date)
+     and (cast('1999-02-22' as date) + interval 30 days)
+|> aggregate sum(ws_ext_sales_price) AS itemrevenue
+     group by i_item_id, i_item_desc, i_category, i_class, i_current_price
+|> extend
+     itemrevenue * 100 / sum(itemrevenue)
+       over (partition by i_class) as revenueratio
+|> order by i_category, i_class, i_item_id, i_item_desc, revenueratio
+|> select i_item_desc, i_category, i_class, i_current_price, itemrevenue, 
revenueratio
+|> limit 100
+-- !query schema
+struct<i_item_desc:string,i_category:string,i_class:string,i_current_price:decimal(7,2),itemrevenue:decimal(17,2),revenueratio:decimal(38,17)>
+-- !query output
+
+
+
+-- !query
+select
+  asceding.rnk,
+  i1.i_product_name best_performing,
+  i2.i_product_name worst_performing
+from (select *
+from (select
+  item_sk,
+  rank()
+  over (
+    order by rank_col asc) rnk
+from (select
+  ss_item_sk item_sk,
+  avg(ss_net_profit) rank_col
+from store_sales ss1
+where ss_store_sk = 4
+group by ss_item_sk
+having avg(ss_net_profit) > 0.9 * (select avg(ss_net_profit) rank_col
+from store_sales
+where ss_store_sk = 4
+  and ss_addr_sk is null
+group by ss_store_sk)) v1) v11
+where rnk < 11) asceding,
+  (select *
+  from (select
+    item_sk,
+    rank()
+    over (
+      order by rank_col desc) rnk
+  from (select
+    ss_item_sk item_sk,
+    avg(ss_net_profit) rank_col
+  from store_sales ss1
+  where ss_store_sk = 4
+  group by ss_item_sk
+  having avg(ss_net_profit) > 0.9 * (select avg(ss_net_profit) rank_col
+  from store_sales
+  where ss_store_sk = 4
+    and ss_addr_sk is null
+  group by ss_store_sk)) v2) v21
+  where rnk < 11) descending,
+  item i1, item i2
+where asceding.rnk = descending.rnk
+  and i1.i_item_sk = asceding.item_sk
+  and i2.i_item_sk = descending.item_sk
+order by asceding.rnk
+limit 100
+-- !query schema
+struct<rnk:int,best_performing:string,worst_performing:string>
+-- !query output
+
+
+
+-- !query
+table store_sales
+|> as ss1
+|> where ss_store_sk = 4
+|> aggregate avg(ss_net_profit) rank_col
+     group by ss_item_sk as item_sk
+|> where rank_col > 0.9 * (
+     table store_sales
+     |> where ss_store_sk = 4
+          and ss_addr_sk is null
+     |> aggregate avg(ss_net_profit) rank_col
+          group by ss_store_sk
+     |> select rank_col)
+|> as v1
+|> select
+     item_sk,
+     rank() over (
+       order by rank_col asc) rnk
+|> as v11
+|> where rnk < 11
+|> as asceding
+|> join (
+     table store_sales
+     |> as ss1
+     |> where ss_store_sk = 4
+     |> aggregate avg(ss_net_profit) rank_col
+          group by ss_item_sk as item_sk
+     |> where rank_col > 0.9 * (
+          table store_sales
+          |> where ss_store_sk = 4
+               and ss_addr_sk is null
+          |> aggregate avg(ss_net_profit) rank_col
+               group by ss_store_sk
+          |> select rank_col)
+     |> as v2
+     |> select
+          item_sk,
+          rank() over (
+            order by rank_col asc) rnk
+     |> as v21
+     |> where rnk < 11) descending
+|> join item i1
+|> join item i2
+|> where asceding.rnk = descending.rnk
+     and i1.i_item_sk = asceding.item_sk
+     and i2.i_item_sk = descending.item_sk
+|> order by asceding.rnk
+|> select
+     asceding.rnk,
+     i1.i_product_name best_performing,
+     i2.i_product_name worst_performing
+-- !query schema
+struct<rnk:int,best_performing:string,worst_performing:string>
+-- !query output
+
+
+
+-- !query
+with web_v1 as (
+  select
+    ws_item_sk item_sk,
+    d_date,
+    sum(sum(ws_sales_price))
+    over (partition by ws_item_sk
+      order by d_date
+      rows between unbounded preceding and current row) cume_sales
+  from web_sales, date_dim
+  where ws_sold_date_sk = d_date_sk
+    and d_month_seq between 1200 and 1200 + 11
+    and ws_item_sk is not null
+  group by ws_item_sk, d_date),
+    store_v1 as (
+    select
+      ss_item_sk item_sk,
+      d_date,
+      sum(sum(ss_sales_price))
+      over (partition by ss_item_sk
+        order by d_date
+        rows between unbounded preceding and current row) cume_sales
+    from store_sales, date_dim
+    where ss_sold_date_sk = d_date_sk
+      and d_month_seq between 1200 and 1200 + 11
+      and ss_item_sk is not null
+    group by ss_item_sk, d_date)
+select *
+from (select
+  item_sk,
+  d_date,
+  web_sales,
+  store_sales,
+  max(web_sales)
+  over (partition by item_sk
+    order by d_date
+    rows between unbounded preceding and current row) web_cumulative,
+  max(store_sales)
+  over (partition by item_sk
+    order by d_date
+    rows between unbounded preceding and current row) store_cumulative
+from (select
+  case when web.item_sk is not null
+    then web.item_sk
+  else store.item_sk end item_sk,
+  case when web.d_date is not null
+    then web.d_date
+  else store.d_date end d_date,
+  web.cume_sales web_sales,
+  store.cume_sales store_sales
+from web_v1 web full outer join store_v1 store on (web.item_sk = store.item_sk
+  and web.d_date = store.d_date)
+     ) x) y
+where web_cumulative > store_cumulative
+order by item_sk, d_date
+limit 100
+-- !query schema
+struct<item_sk:int,d_date:date,web_sales:decimal(27,2),store_sales:decimal(27,2),web_cumulative:decimal(27,2),store_cumulative:decimal(27,2)>
+-- !query output
+
+
+
+-- !query
+with web_v1 as (
+  table web_sales
+  |> join date_dim
+  |> where ws_sold_date_sk = d_date_sk
+       and d_month_seq between 1200 and 1200 + 11
+       and ws_item_sk is not null
+  |> aggregate sum(ws_sales_price) as sum_ws_sales_price
+       group by ws_item_sk as item_sk, d_date
+  |> extend sum(sum_ws_sales_price)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row)
+       as cume_sales),
+store_v1 as (
+  table store_sales
+  |> join date_dim
+  |> where ss_sold_date_sk = d_date_sk
+       and d_month_seq between 1200 and 1200 + 11
+       and ss_item_sk is not null
+  |> aggregate sum(ss_sales_price) as sum_ss_sales_price
+       group by ss_item_sk as item_sk, d_date
+  |> extend sum(sum_ss_sales_price)
+       over (partition by item_sk
+           order by d_date
+           rows between unbounded preceding and current row)
+       as cume_sales)
+table web_v1
+|> as web
+|> full outer join store_v1 store
+     on (web.item_sk = store.item_sk and web.d_date = store.d_date)
+|> select
+     case when web.item_sk is not null
+       then web.item_sk
+       else store.item_sk end item_sk,
+     case when web.d_date is not null
+       then web.d_date
+       else store.d_date end d_date,
+     web.cume_sales web_sales,
+     store.cume_sales store_sales
+|> as x
+|> select
+     item_sk,
+     d_date,
+     web_sales,
+     store_sales,
+     max(web_sales)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row) web_cumulative,
+     max(store_sales)
+       over (partition by item_sk
+         order by d_date
+         rows between unbounded preceding and current row) store_cumulative
+|> as y
+|> where web_cumulative > store_cumulative
+|> order by item_sk, d_date
+|> limit 100
+-- !query schema
+struct<item_sk:int,d_date:date,web_sales:decimal(27,2),store_sales:decimal(27,2),web_cumulative:decimal(27,2),store_cumulative:decimal(27,2)>
+-- !query output
+
+
+
 -- !query
 drop table t
 -- !query schema
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 5c56377f21c2..575a4ae69d1a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -132,7 +132,7 @@ import org.apache.spark.util.Utils
 // scalastyle:on line.size.limit
 @ExtendedSQLTest
 class SQLQueryTestSuite extends QueryTest with SharedSparkSession with 
SQLHelper
-    with SQLQueryTestHelper {
+    with SQLQueryTestHelper with TPCDSSchema {
 
   import IntegratedUDFTestUtils._
 
@@ -165,13 +165,17 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
   protected def ignoreList: Set[String] = Set(
     "ignored.sql" // Do NOT remove this one. It is here to test the ignore 
functionality.
   ) ++ otherIgnoreList
+  /** List of test cases that require TPCDS table schemas to be loaded. */
+  private def requireTPCDSCases: Seq[String] = Seq("pipe-operators.sql")
+  /** List of TPCDS table names and schemas to load from the [[TPCDSSchema]] 
base class. */
+  private val tpcDSTableNamesToSchemas: Map[String, String] = tableColumns
 
   // Create all the test cases.
   listTestCases.foreach(createScalaTestCase)
 
   protected def createScalaTestCase(testCase: TestCase): Unit = {
     if (ignoreList.exists(t =>
-        
testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) {
+      
testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) {
       // Create a test case to ignore this case.
       ignore(testCase.name) { /* Do nothing */ }
     } else testCase match {
@@ -322,6 +326,15 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
       setOperations.foreach(localSparkSession.sql)
     }
 
+    // Load TPCDS table schemas for the test case if required.
+    val lowercaseTestCase = testCase.name.toLowerCase(Locale.ROOT)
+    if (requireTPCDSCases.contains(lowercaseTestCase)) {
+      tpcDSTableNamesToSchemas.foreach { case (name: String, schema: String) =>
+        localSparkSession.sql(s"DROP TABLE IF EXISTS $name")
+        localSparkSession.sql(s"CREATE TABLE `$name` ($schema) USING parquet")
+      }
+    }
+
     // Run the SQL queries preparing them for comparison.
     val outputs: Seq[QueryTestOutput] = queries.map { sql =>
       testCase match {
@@ -348,6 +361,13 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
       }
     }
 
+    // Drop TPCDS tables after the test case if required.
+    if (requireTPCDSCases.contains(lowercaseTestCase)) {
+      tpcDSTableNamesToSchemas.foreach { case (name: String, schema: String) =>
+        localSparkSession.sql(s"DROP TABLE IF EXISTS $name")
+      }
+    }
+
     if (regenerateGoldenFiles) {
       // Again, we are explicitly not using multi-line string due to 
stripMargin removing "|".
       val goldenOutput = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to