This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 58e5d86cc07 [SPARK-44503][SQL] Add SQL grammar for PARTITION BY and
ORDER BY clause after TABLE arguments for TVF calls
58e5d86cc07 is described below
commit 58e5d86cc076d4546dac5e1f594977d615ec1e7a
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Mon Jul 24 16:08:46 2023 -0700
[SPARK-44503][SQL] Add SQL grammar for PARTITION BY and ORDER BY clause
after TABLE arguments for TVF calls
### What changes were proposed in this pull request?
This PR adds SQL grammar for PARTITION BY and ORDER BY clause after TABLE
arguments for TVF calls.
Note that this PR just includes the SQL grammar and parsing part, no
analysis support is implemented yet, that will come next.
Examples:
```
select * from tvf(arg1 => table(t1) partition by col1);
select * from tvf(arg1 => table(t1) partition by col1 order by col2 asc);
select * from tvf(arg1 => table(t1) partition by col1, col2 order by col2
asc, col3 desc);
select * from tvf(arg1 => table(select col1, col2, col3 from v2)
partition by col1, col2 order by col2 asc, col3 desc);
```
### Why are the changes needed?
This will provide a way for the TVF caller to indicate desired semantics
for dividing up the rows of the input table into partitions for consumption by
the underlying algorithm.
### Does this PR introduce _any_ user-facing change?
Yes, it adds new SQL grammar.
### How was this patch tested?
This PR adds new parser unit tests. Currently the parser returns "not
implemented yet" error for these cases, and we will implement analysis for them
next.
Closes #42100 from dtenedor/partition-by-clause.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
---
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 11 +++--
.../spark/sql/catalyst/parser/AstBuilder.scala | 7 +++
.../sql/catalyst/parser/PlanParserSuite.scala | 51 ++++++++++++++++++++++
3 files changed, 66 insertions(+), 3 deletions(-)
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 85dbc499fbd..372e0b54732 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -789,9 +789,14 @@ inlineTable
;
functionTableSubqueryArgument
- : TABLE identifierReference
- | TABLE LEFT_PAREN identifierReference RIGHT_PAREN
- | TABLE LEFT_PAREN query RIGHT_PAREN
+ : TABLE identifierReference tableArgumentPartitioning?
+ | TABLE LEFT_PAREN identifierReference RIGHT_PAREN
tableArgumentPartitioning?
+ | TABLE LEFT_PAREN query RIGHT_PAREN tableArgumentPartitioning?
+ ;
+
+tableArgumentPartitioning
+ : (PARTITION | DISTRIBUTE) BY expressionSeq
+ ((ORDER | SORT) BY sortItem (COMMA sortItem)*)?
;
functionTableNamedArgumentExpression
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 7a28efa3e42..ccfcd13440c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1564,6 +1564,13 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
}.getOrElse {
plan(ctx.query)
}
+ val partitioning = Option(ctx.tableArgumentPartitioning)
+ if (partitioning.isDefined) {
+ // The PARTITION BY clause is not implemented yet for TABLE arguments to
table valued function
+ // calls.
+ operationNotAllowed(
+ "Specifying the PARTITION BY clause for TABLE arguments is not
implemented yet", ctx)
+ }
FunctionTableSubqueryArgumentExpression(p)
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 4a5d0a0ae29..1d50e3bb479 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -1492,6 +1492,57 @@ class PlanParserSuite extends AnalysisTest {
stop = sql1.length - 2))
}
+ test("SPARK-44503: Support PARTITION BY and ORDER BY clause for TVF TABLE
arguments") {
+ Seq("partition", "distribute").foreach { partition =>
+ Seq("order", "sort").foreach { order =>
+ val sql1suffix = s"table(v1) $partition by col1"
+ val sql1 = s"select * from my_tvf(arg1 => $sql1suffix)"
+ val startIndex = 29
+ val message =
+ "Specifying the PARTITION BY clause for TABLE arguments is not
implemented yet"
+ checkError(
+ exception = parseException(sql1),
+ errorClass = "_LEGACY_ERROR_TEMP_0035",
+ parameters = Map("message" -> message),
+ context = ExpectedContext(
+ fragment = sql1suffix,
+ start = startIndex,
+ stop = sql1.length - 2))
+ val sql2suffix = s"table(v1) $partition by col1 $order by col2 asc"
+ val sql2 = s"select * from my_tvf(arg1 => $sql2suffix)"
+ checkError(
+ exception = parseException(sql2),
+ errorClass = "_LEGACY_ERROR_TEMP_0035",
+ parameters = Map("message" -> message),
+ context = ExpectedContext(
+ fragment = sql2suffix,
+ start = startIndex,
+ stop = sql2.length - 2))
+ val sql3suffix = s"table(v1) $partition by col1, col2 $order by col2
asc, col3 desc"
+ val sql3 = s"select * from my_tvf(arg1 => $sql3suffix)"
+ checkError(
+ exception = parseException(sql3),
+ errorClass = "_LEGACY_ERROR_TEMP_0035",
+ parameters = Map("message" -> message),
+ context = ExpectedContext(
+ fragment = sql3suffix,
+ start = startIndex,
+ stop = sql3.length - 2))
+ val sql4Suffix = s"table(select col1, col2, col3 from v2) $partition
by col1, col2 " +
+ s"$order by col2 asc, col3 desc"
+ val sql4 = s"select * from my_tvf(arg1 => $sql4Suffix)"
+ checkError(
+ exception = parseException(sql4),
+ errorClass = "_LEGACY_ERROR_TEMP_0035",
+ parameters = Map("message" -> message),
+ context = ExpectedContext(
+ fragment = sql4Suffix,
+ start = startIndex,
+ stop = sql4.length - 2))
+ }
+ }
+ }
+
test("SPARK-32106: TRANSFORM plan") {
// verify schema less
assertEqual(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]