This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 18dbaa5a070c [SPARK-49560][SQL] Add SQL pipe syntax for the 
TABLESAMPLE operator
18dbaa5a070c is described below

commit 18dbaa5a070c74007137780e8529321b75b10b48
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Wed Oct 2 13:19:03 2024 -0700

    [SPARK-49560][SQL] Add SQL pipe syntax for the TABLESAMPLE operator
    
    ### What changes were proposed in this pull request?
    
    WIP
    
    This PR adds SQL pipe syntax support for the TABLESAMPLE operator.
    
    For example:
    
    ```
    CREATE TABLE t(x INT, y STRING) USING CSV;
    INSERT INTO t VALUES (0, 'abc'), (1, 'def');
    
    TABLE t
    |> TABLESAMPLE (100 PERCENT) REPEATABLE (0)
    |> TABLESAMPLE (5 ROWS) REPEATABLE (0)
    |> TABLESAMPLE (BUCKET 1 OUT OF 1) REPEATABLE (0);
    
    0       abc
    1       def
    ```
    
    ### Why are the changes needed?
    
    The SQL pipe operator syntax will let users compose queries in a more 
flexible fashion.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see above.
    
    ### How was this patch tested?
    
    This PR adds a few unit test cases, but mostly relies on golden file test 
coverage. I did this to make sure the answers are correct as this feature is 
implemented and also so we can look at the analyzer output plans to ensure they 
look right as well.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48168 from dtenedor/pipe-tablesample.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   1 +
 .../spark/sql/catalyst/parser/AstBuilder.scala     |   4 +-
 .../analyzer-results/pipe-operators.sql.out        | 184 +++++++++++++++++++
 .../resources/sql-tests/inputs/pipe-operators.sql  |  49 +++++
 .../sql-tests/results/pipe-operators.sql.out       | 198 +++++++++++++++++++++
 .../spark/sql/execution/SparkSqlParserSuite.scala  |   9 +
 6 files changed, 444 insertions(+), 1 deletion(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 33ac3249eb66..e8e2e980135a 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -1504,6 +1504,7 @@ operatorPipeRightSide
     // messages in the event that both are present (this is not allowed).
     | pivotClause unpivotClause?
     | unpivotClause pivotClause?
+    | sample
     ;
 
 // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in 
Spark SQL.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index e2350474a870..9ce96ae652fe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -5903,7 +5903,9 @@ class AstBuilder extends DataTypeAstBuilder
         throw 
QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx)
       }
       withUnpivot(c, left)
-    }.get)))
+    }.getOrElse(Option(ctx.sample).map { c =>
+      withSample(c, left)
+    }.get))))
   }
 
   /**
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 8cd062aeb01a..aee8da46aafb 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -921,6 +921,190 @@ org.apache.spark.sql.catalyst.parser.ParseException
 }
 
 
+-- !query
+table t
+|> tablesample (100 percent) repeatable (0)
+-- !query analysis
+Sample 0.0, 1.0, false, 0
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> tablesample (2 rows) repeatable (0)
+-- !query analysis
+GlobalLimit 2
++- LocalLimit 2
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> tablesample (bucket 1 out of 1) repeatable (0)
+-- !query analysis
+Sample 0.0, 1.0, false, 0
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> tablesample (100 percent) repeatable (0)
+|> tablesample (5 rows) repeatable (0)
+|> tablesample (bucket 1 out of 1) repeatable (0)
+-- !query analysis
+Sample 0.0, 1.0, false, 0
++- GlobalLimit 5
+   +- LocalLimit 5
+      +- Sample 0.0, 1.0, false, 0
+         +- SubqueryAlias spark_catalog.default.t
+            +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> tablesample ()
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0014",
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 25,
+    "fragment" : "tablesample ()"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (-100 percent)
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0064",
+  "messageParameters" : {
+    "msg" : "Sampling fraction (-1.0) must be on interval [0, 1]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 37,
+    "fragment" : "tablesample (-100 percent)"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (-5 rows)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE",
+  "sqlState" : "42K0E",
+  "messageParameters" : {
+    "expr" : "\"-5\"",
+    "name" : "limit",
+    "v" : "-5"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 25,
+    "stopIndex" : 26,
+    "fragment" : "-5"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (x rows)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_LIMIT_LIKE_EXPRESSION.IS_UNFOLDABLE",
+  "sqlState" : "42K0E",
+  "messageParameters" : {
+    "expr" : "\"x\"",
+    "name" : "limit"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 25,
+    "stopIndex" : 25,
+    "fragment" : "x"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (bucket 2 out of 1)
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0064",
+  "messageParameters" : {
+    "msg" : "Sampling fraction (2.0) must be on interval [0, 1]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 42,
+    "fragment" : "tablesample (bucket 2 out of 1)"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (200b) repeatable (0)
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0015",
+  "messageParameters" : {
+    "msg" : "byteLengthLiteral"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 44,
+    "fragment" : "tablesample (200b) repeatable (0)"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (200) repeatable (0)
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0016",
+  "messageParameters" : {
+    "bytesStr" : "200"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 43,
+    "fragment" : "tablesample (200) repeatable (0)"
+  } ]
+}
+
+
 -- !query
 drop table t
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql 
b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
index 3aa01d472e83..31748fe1125a 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
@@ -326,6 +326,55 @@ table courseSales
      for `year` in (2012, 2013)
    );
 
+-- Sampling operators: positive tests.
+--------------------------------------
+
+-- We will use the REPEATABLE clause and/or adjust the sampling options to 
either remove no rows or
+-- all rows to help keep the tests deterministic.
+table t
+|> tablesample (100 percent) repeatable (0);
+
+table t
+|> tablesample (2 rows) repeatable (0);
+
+table t
+|> tablesample (bucket 1 out of 1) repeatable (0);
+
+table t
+|> tablesample (100 percent) repeatable (0)
+|> tablesample (5 rows) repeatable (0)
+|> tablesample (bucket 1 out of 1) repeatable (0);
+
+-- Sampling operators: negative tests.
+--------------------------------------
+
+-- The sampling method is required.
+table t
+|> tablesample ();
+
+-- Negative sampling options are not supported.
+table t
+|> tablesample (-100 percent);
+
+table t
+|> tablesample (-5 rows);
+
+-- The sampling method may not refer to attribute names from the input 
relation.
+table t
+|> tablesample (x rows);
+
+-- The bucket number is invalid.
+table t
+|> tablesample (bucket 2 out of 1);
+
+-- Byte literals are not supported.
+table t
+|> tablesample (200b) repeatable (0);
+
+-- Invalid byte literal syntax.
+table t
+|> tablesample (200) repeatable (0);
+
 -- Cleanup.
 -----------
 drop table t;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 2c6abe2a277a..78b610b0d97c 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -861,6 +861,204 @@ org.apache.spark.sql.catalyst.parser.ParseException
 }
 
 
+-- !query
+table t
+|> tablesample (100 percent) repeatable (0)
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> tablesample (2 rows) repeatable (0)
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> tablesample (bucket 1 out of 1) repeatable (0)
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> tablesample (100 percent) repeatable (0)
+|> tablesample (5 rows) repeatable (0)
+|> tablesample (bucket 1 out of 1) repeatable (0)
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+|> tablesample ()
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0014",
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 25,
+    "fragment" : "tablesample ()"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (-100 percent)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0064",
+  "messageParameters" : {
+    "msg" : "Sampling fraction (-1.0) must be on interval [0, 1]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 37,
+    "fragment" : "tablesample (-100 percent)"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (-5 rows)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE",
+  "sqlState" : "42K0E",
+  "messageParameters" : {
+    "expr" : "\"-5\"",
+    "name" : "limit",
+    "v" : "-5"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 25,
+    "stopIndex" : 26,
+    "fragment" : "-5"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (x rows)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_LIMIT_LIKE_EXPRESSION.IS_UNFOLDABLE",
+  "sqlState" : "42K0E",
+  "messageParameters" : {
+    "expr" : "\"x\"",
+    "name" : "limit"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 25,
+    "stopIndex" : 25,
+    "fragment" : "x"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (bucket 2 out of 1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0064",
+  "messageParameters" : {
+    "msg" : "Sampling fraction (2.0) must be on interval [0, 1]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 42,
+    "fragment" : "tablesample (bucket 2 out of 1)"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (200b) repeatable (0)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0015",
+  "messageParameters" : {
+    "msg" : "byteLengthLiteral"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 44,
+    "fragment" : "tablesample (200b) repeatable (0)"
+  } ]
+}
+
+
+-- !query
+table t
+|> tablesample (200) repeatable (0)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_0016",
+  "messageParameters" : {
+    "bytesStr" : "200"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 12,
+    "stopIndex" : 43,
+    "fragment" : "tablesample (200) repeatable (0)"
+  } ]
+}
+
+
 -- !query
 drop table t
 -- !query schema
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 1111a65c6a52..c76d44a1b82c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -928,6 +928,15 @@ class SparkSqlParserSuite extends AnalysisTest with 
SharedSparkSession {
           |  earningsYear FOR year IN (`2012`, `2013`, `2014`)
           |)
           |""".stripMargin)
+      // Sampling operations
+      def checkSample(query: String): Unit = {
+        val plan: LogicalPlan = parser.parsePlan(query)
+        assert(plan.collectFirst(_.isInstanceOf[Sample]).nonEmpty)
+        assert(plan.containsAnyPattern(UNRESOLVED_RELATION, LOCAL_RELATION))
+      }
+      checkSample("TABLE t |> TABLESAMPLE (50 PERCENT)")
+      checkSample("TABLE t |> TABLESAMPLE (5 ROWS)")
+      checkSample("TABLE t |> TABLESAMPLE (BUCKET 4 OUT OF 10)")
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to