This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 195c7dc9d2a2 [SPARK-55054][SS][SQL] Add IDENTIFIED BY support for
streaming table-valued functions
195c7dc9d2a2 is described below
commit 195c7dc9d2a2407268f86b7bb79190b33c2f4e71
Author: ericm-db <[email protected]>
AuthorDate: Fri Jan 16 11:53:56 2026 -0800
[SPARK-55054][SS][SQL] Add IDENTIFIED BY support for streaming table-valued
functions
### What changes were proposed in this pull request?
This PR extends the IDENTIFIED BY syntax to support streaming table-valued
functions (TVFs), complementing the existing support for streaming tables.
The changes include:
- Added grammar rules for streaming TVFs with IDENTIFIED BY clause
- Split `functionTable` into `tableFunctionCall` + clauses for consistent
syntax
- Added validation that IDENTIFIED BY is only allowed for streaming sources
- Streaming TVFs support two syntaxes:
1. `STREAM tvf() IDENTIFIED BY name` - clauses inside
2. `STREAM(tvf()) IDENTIFIED BY name` - clauses outside (consistent with
table syntax)
- Added comprehensive test coverage for both syntaxes
### Why are the changes needed?
Streaming TVFs (like range(), read_files() when available) need the ability
to be named just like streaming tables. This ensures:
- Consistent source naming across all streaming source types
- Better observability for streaming queries using TVFs
- Proper checkpoint management for TVF-based streams
### Does this PR introduce _any_ user-facing change?
Yes. Users can now use IDENTIFIED BY with streaming TVFs:
```sql
-- Non-parenthesized form
SELECT * FROM STREAM range(100) IDENTIFIED BY my_range_source
-- Parenthesized form (clauses outside for consistency)
SELECT * FROM STREAM(range(100)) IDENTIFIED BY my_range_source
-- With watermark and alias
SELECT * FROM STREAM range(100)
IDENTIFIED BY my_source
WATERMARK ts DELAY OF INTERVAL 1 MINUTE
AS src
```
Using IDENTIFIED BY on non-streaming TVFs produces a clear error:
```
IDENTIFIED BY clause is only supported for streaming sources
```
### How was this patch tested?
- Added comprehensive tests in StreamRelationParserSuite covering:
- Non-parenthesized form with all clause combinations
- Parenthesized form with all clause combinations
- Validation that non-streaming TVFs reject IDENTIFIED BY
- Tests verify correct placement of clauses in both syntaxes
- All existing tests continue to pass
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53819 from ericm-db/streaming-tvf-identified-by.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 21 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 89 ++++--
.../parser/StreamRelationParserSuite.scala | 311 ++++++++++++++++++++-
3 files changed, 398 insertions(+), 23 deletions(-)
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index a7b3efda2b81..b53af82a3d2f 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -429,6 +429,9 @@ streamRelationPrimary
| STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN
optionsClause? identifiedByClause?
watermarkClause? tableAlias
#streamTableName
+ | STREAM tableFunctionCallWithTrailingClauses
#streamTableValuedFunction
+ | STREAM LEFT_PAREN tableFunctionCall RIGHT_PAREN
+ identifiedByClause? watermarkClause? tableAlias
#streamTableValuedFunction
;
setResetStatement
@@ -1068,7 +1071,7 @@ relationPrimary
| LEFT_PAREN relation RIGHT_PAREN sample?
watermarkClause? tableAlias #aliasedRelation
| inlineTable
#inlineTableDefault2
- | functionTable
#tableValuedFunction
+ | tableFunctionCallWithTrailingClauses
#tableValuedFunction
;
optionsClause
@@ -1118,13 +1121,19 @@ functionTableArgument
| functionArgument
;
-// This is only used in relationPrimary where having watermarkClause makes
sense. If this becomes
-// referred by other clause, please check wheter watermarkClause makes sense
to the clause.
-// If not, consider separate this rule.
-functionTable
+// A table function call including opening and closing parentheses.
+tableFunctionCall
: funcName=functionName LEFT_PAREN
(functionTableArgument (COMMA functionTableArgument)*)?
- RIGHT_PAREN watermarkClause? tableAlias
+ RIGHT_PAREN
+ ;
+
+// A table function call with optional trailing clauses for streaming and
aliasing.
+// The identifiedByClause is optional and only valid for streaming TVFs. For
non-streaming TVFs,
+// the AST builder will reject it with an error. The clause must come before
watermarkClause
+// and tableAlias to avoid ambiguity (since IDENTIFIED is a nonReserved
keyword).
+tableFunctionCallWithTrailingClauses
+ : tableFunctionCall identifiedByClause? watermarkClause? tableAlias
;
tableAlias
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 71d919420ca8..46d7171cce5a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2431,26 +2431,30 @@ class AstBuilder extends DataTypeAstBuilder
}
/**
- * Create a table-valued function call with arguments, e.g. range(1000)
- */
- override def visitTableValuedFunction(ctx: TableValuedFunctionContext)
- : LogicalPlan = withOrigin(ctx) {
- val func = ctx.functionTable
- val aliases = if (func.tableAlias.identifierList != null) {
- visitIdentifierList(func.tableAlias.identifierList)
+ * Build an UnresolvedTableValuedFunction from a tableFunctionCall context.
+ * Used by both visitTableValuedFunction and stream TVF visitors.
+ */
+ private def buildTvfFromTableFunctionCall(
+ funcCallCtx: TableFunctionCallContext,
+ tableAliasCtx: TableAliasContext,
+ watermarkClauseCtx: WatermarkClauseContext): LogicalPlan = {
+ val aliases = if (tableAliasCtx != null && tableAliasCtx.identifierList !=
null) {
+ visitIdentifierList(tableAliasCtx.identifierList)
} else {
Seq.empty
}
-
withFuncIdentClause(
- func.functionName,
+ funcCallCtx.funcName,
Nil,
(ident, _) => {
if (ident.length > 1) {
- throw QueryParsingErrors.invalidTableValuedFunctionNameError(ident,
ctx)
+ throw new ParseException(
+ errorClass = "INVALID_SQL_SYNTAX.INVALID_TABLE_VALUED_FUNC_NAME",
+ messageParameters = Map("funcName" -> toSQLId(ident)),
+ ctx = funcCallCtx)
}
- val funcName = func.functionName.getText
- val args = func.functionTableArgument.asScala.map { e =>
+ val funcName = funcCallCtx.funcName.getText
+ val args = funcCallCtx.functionTableArgument.asScala.map { e =>
Option(e.functionArgument).map(extractNamedArgument(_, funcName))
.getOrElse {
extractFunctionTableNamedArgument(e.functionTableReferenceArgument, funcName)
@@ -2460,11 +2464,34 @@ class AstBuilder extends DataTypeAstBuilder
val tvf = UnresolvedTableValuedFunction(ident, args)
val tvfAliases = if (aliases.nonEmpty) UnresolvedTVFAliases(ident,
tvf, aliases) else tvf
+ val tvfWithWatermark =
tvfAliases.optionalMap(watermarkClauseCtx)(withWatermark)
+ Option(tableAliasCtx).map { c =>
+ tvfWithWatermark.optionalMap(c.strictIdentifier)(aliasPlan)
+ }.getOrElse {
+ tvfWithWatermark
+ }
+ }
+ )
+ }
- val watermarkClause = func.watermarkClause()
- val tvfWithWatermark =
tvfAliases.optionalMap(watermarkClause)(withWatermark)
-
tvfWithWatermark.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan)
- })
+ /**
+ * Create a table-valued function call with arguments, e.g. range(1000)
+ */
+ override def visitTableValuedFunction(ctx: TableValuedFunctionContext)
+ : LogicalPlan = withOrigin(ctx) {
+ // IDENTIFIED BY is only valid for streaming TVFs
+ if (ctx.tableFunctionCallWithTrailingClauses.identifiedByClause != null) {
+ operationNotAllowed("IDENTIFIED BY clause is only supported for
streaming sources", ctx)
+ }
+
visitTableFunctionCallWithTrailingClauses(ctx.tableFunctionCallWithTrailingClauses)
+ }
+
+ /**
+ * Create a table-valued function call with optional trailing clauses.
+ */
+ override def visitTableFunctionCallWithTrailingClauses(
+ ctx: TableFunctionCallWithTrailingClausesContext): LogicalPlan =
withOrigin(ctx) {
+ buildTvfFromTableFunctionCall(ctx.tableFunctionCall, ctx.tableAlias,
ctx.watermarkClause)
}
/**
@@ -2493,6 +2520,36 @@ class AstBuilder extends DataTypeAstBuilder
}
}
+ /**
+ * Create a logical plan for a stream TVF.
+ * Handles two forms:
+ * 1. STREAM tableFunctionCallWithTrailingClauses - clauses are inside
+ * 2. STREAM(tableFunctionCall) clauses - clauses are outside STREAM() for
consistency with
+ * table names
+ */
+ override def visitStreamTableValuedFunction(ctx:
StreamTableValuedFunctionContext): LogicalPlan =
+ withOrigin(ctx) {
+ Option(ctx.tableFunctionCallWithTrailingClauses).map { funcTable =>
+ // Form: STREAM tableFunctionCallWithTrailingClauses
+ val sourceName = extractSourceName(funcTable.identifiedByClause)
+ val tvfPlan = buildTvfFromTableFunctionCall(
+ funcTable.tableFunctionCall, funcTable.tableAlias,
funcTable.watermarkClause)
+ tvfPlan.transformUp {
+ case tvf: UnresolvedTableValuedFunction =>
+ NamedStreamingRelation.withUserProvidedName(tvf.copy(isStreaming =
true), sourceName)
+ }
+ }.getOrElse {
+ // Form: STREAM(tableFunctionCall) identifiedByClause?
watermarkClause? tableAlias
+ val sourceName = extractSourceName(ctx.identifiedByClause)
+ val tvfPlan = buildTvfFromTableFunctionCall(
+ ctx.tableFunctionCall, ctx.tableAlias, ctx.watermarkClause)
+ tvfPlan.transformUp {
+ case tvf: UnresolvedTableValuedFunction =>
+ NamedStreamingRelation.withUserProvidedName(tvf.copy(isStreaming =
true), sourceName)
+ }
+ }
+ }
+
/**
* Create an inline table (a virtual table in Hive parlance).
*/
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
index 3c8e8152eca5..000104259c80 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.AliasIdentifier
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest,
NamedStreamingRelation, UnresolvedRelation, UnresolvedStar}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest,
NamedStreamingRelation, UnresolvedRelation, UnresolvedStar,
UnresolvedTableValuedFunction}
import org.apache.spark.sql.catalyst.plans.logical.{Project, SubqueryAlias}
import org.apache.spark.sql.catalyst.streaming.{Unassigned, UserProvided}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -323,4 +323,313 @@ class StreamRelationParserSuite extends AnalysisTest {
assert(namedStreamingRelations.size == 1)
assert(namedStreamingRelations.head.sourceIdentifyingName ==
UserProvided("my-source-name"))
}
+
+ // =============================================================
+ // Comprehensive tests for all clause combinations with tables
+ // =============================================================
+
+ test("STREAM table non-parenthesized form with various clause combinations")
{
+ // Non-parenthesized form: STREAM table clauses
+ val wm = "WATERMARK col DELAY OF INTERVAL 1 MINUTE"
+ val testCases = Seq(
+ // (query, expectedSourceName)
+ ("SELECT * FROM STREAM t AS src", Unassigned),
+ ("SELECT * FROM STREAM t IDENTIFIED BY src1", UserProvided("src1")),
+ ("SELECT * FROM STREAM t IDENTIFIED BY src1 AS tbl",
UserProvided("src1")),
+ (s"SELECT * FROM STREAM t $wm", Unassigned),
+ (s"SELECT * FROM STREAM t $wm AS tbl", Unassigned),
+ (s"SELECT * FROM STREAM t IDENTIFIED BY src1 $wm", UserProvided("src1")),
+ (s"SELECT * FROM STREAM t IDENTIFIED BY src1 $wm AS tbl",
UserProvided("src1"))
+ )
+
+ testCases.foreach { case (query, expectedSourceName) =>
+ val plan = parsePlan(query)
+ assert(plan.isStreaming, s"Expected streaming plan for: $query")
+ val namedStreamingRelations = plan.collect {
+ case n: NamedStreamingRelation => n
+ }
+ assert(namedStreamingRelations.size == 1,
+ s"Expected 1 NamedStreamingRelation node in: $query")
+ assert(namedStreamingRelations.head.sourceIdentifyingName ==
expectedSourceName,
+ s"Expected source name $expectedSourceName in: $query")
+ }
+ }
+
+ test("STREAM table parenthesized form with various clause combinations") {
+ // Parenthesized form: STREAM(table) clauses
+ val wm = "WATERMARK col DELAY OF INTERVAL 1 MINUTE"
+ val testCases = Seq(
+ // (query, expectedSourceName)
+ ("SELECT * FROM STREAM(t) AS src", Unassigned),
+ ("SELECT * FROM STREAM(t) IDENTIFIED BY src1", UserProvided("src1")),
+ ("SELECT * FROM STREAM(t) IDENTIFIED BY src1 AS tbl",
UserProvided("src1")),
+ (s"SELECT * FROM STREAM(t) $wm", Unassigned),
+ (s"SELECT * FROM STREAM(t) $wm AS tbl", Unassigned),
+ (s"SELECT * FROM STREAM(t) IDENTIFIED BY src1 $wm",
UserProvided("src1")),
+ (s"SELECT * FROM STREAM(t) IDENTIFIED BY src1 $wm AS tbl",
UserProvided("src1"))
+ )
+
+ testCases.foreach { case (query, expectedSourceName) =>
+ val plan = parsePlan(query)
+ assert(plan.isStreaming, s"Expected streaming plan for: $query")
+ val namedStreamingRelations = plan.collect {
+ case n: NamedStreamingRelation => n
+ }
+ assert(namedStreamingRelations.size == 1,
+ s"Expected 1 NamedStreamingRelation node in: $query")
+ assert(namedStreamingRelations.head.sourceIdentifyingName ==
expectedSourceName,
+ s"Expected source name $expectedSourceName in: $query")
+ }
+ }
+
+ test("STREAM table with IDENTIFIED BY and alias verifies SubqueryAlias
structure") {
+ // Verify that SubqueryAlias wraps NamedStreamingRelation when both are
present
+ comparePlans(
+ parsePlan("SELECT * FROM STREAM t IDENTIFIED BY my_source AS src"),
+ Project(
+ projectList = Seq(UnresolvedStar(None)),
+ child = SubqueryAlias(
+ identifier = AliasIdentifier(
+ name = "src",
+ qualifier = Seq.empty
+ ),
+ child = NamedStreamingRelation(
+ child = UnresolvedRelation(
+ multipartIdentifier = Seq("t"),
+ isStreaming = true
+ ),
+ sourceIdentifyingName = UserProvided("my_source")
+ )
+ )
+ )
+ )
+ }
+
+ test("STREAM table with options, IDENTIFIED BY, and alias verifies plan
structure") {
+ // Verify complete plan structure with all clauses
+ comparePlans(
+ parsePlan("SELECT * FROM STREAM t WITH ('key'='value') IDENTIFIED BY
my_source AS src"),
+ Project(
+ projectList = Seq(UnresolvedStar(None)),
+ child = SubqueryAlias(
+ identifier = AliasIdentifier(
+ name = "src",
+ qualifier = Seq.empty
+ ),
+ child = NamedStreamingRelation(
+ child = UnresolvedRelation(
+ multipartIdentifier = Seq("t"),
+ options = new CaseInsensitiveStringMap(Map("key" ->
"value").asJava),
+ isStreaming = true
+ ),
+ sourceIdentifyingName = UserProvided("my_source")
+ )
+ )
+ )
+ )
+ }
+
+ // ===================================
+ // IDENTIFIED BY syntax tests for TVFs
+ // ===================================
+
+ test("STREAM TVF non-parenthesized form with various clause combinations") {
+ // Non-parenthesized form: STREAM tvf() clauses
+ val wm = "WATERMARK col DELAY OF INTERVAL 1 MINUTE"
+ val testCases = Seq(
+ // (query, expectedSourceName)
+ ("SELECT * FROM STREAM range(10) AS src", Unassigned),
+ ("SELECT * FROM STREAM range(10) IDENTIFIED BY src1",
UserProvided("src1")),
+ ("SELECT * FROM STREAM range(10) IDENTIFIED BY src1 AS t",
UserProvided("src1")),
+ (s"SELECT * FROM STREAM range(10) $wm", Unassigned),
+ (s"SELECT * FROM STREAM range(10) $wm AS t", Unassigned),
+ (s"SELECT * FROM STREAM range(10) IDENTIFIED BY src1 $wm",
UserProvided("src1")),
+ (s"SELECT * FROM STREAM range(10) IDENTIFIED BY src1 $wm AS t",
UserProvided("src1"))
+ )
+
+ testCases.foreach { case (query, expectedSourceName) =>
+ val plan = parsePlan(query)
+ val namedStreamingRelations = plan.collect {
+ case n: NamedStreamingRelation => n
+ }
+ assert(namedStreamingRelations.size == 1,
+ s"Expected 1 NamedStreamingRelation node in: $query")
+ assert(namedStreamingRelations.head.sourceIdentifyingName ==
expectedSourceName,
+ s"Expected source name $expectedSourceName in: $query")
+ }
+
+ // Also verify the underlying TVF is streaming
+ val plan = parsePlan("SELECT * FROM STREAM range(10) IDENTIFIED BY src1")
+ plan.collectFirst { case n: NamedStreamingRelation => n }.get.child match {
+ case u: UnresolvedTableValuedFunction =>
+ assert(u.isStreaming, "TVF should be marked as streaming")
+ case _ => fail("Expected UnresolvedTableValuedFunction as child")
+ }
+ }
+
+ test("STREAM TVF parenthesized form with various clause combinations") {
+ // Parenthesized form: STREAM(tvf()) clauses - clauses OUTSIDE parentheses
+ // This is consistent with table syntax: STREAM(table) IDENTIFIED BY ...
+ val wm = "WATERMARK col DELAY OF INTERVAL 1 MINUTE"
+ val testCases = Seq(
+ // (query, expectedSourceName)
+ ("SELECT * FROM STREAM(range(10)) AS src", Unassigned),
+ ("SELECT * FROM STREAM(range(10)) IDENTIFIED BY src1",
UserProvided("src1")),
+ ("SELECT * FROM STREAM(range(10)) IDENTIFIED BY src1 AS t",
UserProvided("src1")),
+ (s"SELECT * FROM STREAM(range(10)) $wm", Unassigned),
+ (s"SELECT * FROM STREAM(range(10)) $wm AS t", Unassigned),
+ (s"SELECT * FROM STREAM(range(10)) IDENTIFIED BY src1 $wm",
UserProvided("src1")),
+ (s"SELECT * FROM STREAM(range(10)) IDENTIFIED BY src1 $wm AS t",
UserProvided("src1"))
+ )
+
+ testCases.foreach { case (query, expectedSourceName) =>
+ val plan = parsePlan(query)
+ val namedStreamingRelations = plan.collect {
+ case n: NamedStreamingRelation => n
+ }
+ assert(namedStreamingRelations.size == 1,
+ s"Expected 1 NamedStreamingRelation node in: $query")
+ assert(namedStreamingRelations.head.sourceIdentifyingName ==
expectedSourceName,
+ s"Expected source name $expectedSourceName in: $query")
+ }
+ }
+
+ test("STREAM TVF with IDENTIFIED BY and alias verifies SubqueryAlias
structure") {
+ // Verify that SubqueryAlias wraps NamedStreamingRelation when both are
present
+ val plan = parsePlan("SELECT * FROM STREAM range(10) IDENTIFIED BY
my_source AS src")
+
+ // Check that we have a SubqueryAlias wrapping NamedStreamingRelation
+ plan match {
+ case Project(_, SubqueryAlias(aliasId, namedStream:
NamedStreamingRelation)) =>
+ assert(aliasId.name == "src", "Expected alias name 'src'")
+ assert(namedStream.sourceIdentifyingName == UserProvided("my_source"),
+ "Expected source name 'my_source'")
+ namedStream.child match {
+ case tvf: UnresolvedTableValuedFunction =>
+ assert(tvf.isStreaming, "TVF should be marked as streaming")
+ assert(tvf.name == Seq("range"), "Expected 'range' TVF")
+ case other => fail(s"Expected UnresolvedTableValuedFunction but got:
$other")
+ }
+ case other => fail(s"Expected
Project(SubqueryAlias(NamedStreamingRelation)) but got: $other")
+ }
+ }
+
+ test("STREAM TVF with all clauses verifies complete plan structure") {
+ // Verify the complete plan structure with IDENTIFIED BY, WATERMARK, and
alias
+ // The structure should be:
+ // Project -> SubqueryAlias -> UnresolvedEventTimeWatermark ->
NamedStreamingRelation -> TVF
+ val query = """SELECT * FROM STREAM range(10)
+ | IDENTIFIED BY my_source
+ | WATERMARK col DELAY OF INTERVAL 1 MINUTE
+ | AS src""".stripMargin
+ val plan = parsePlan(query)
+
+ // Verify the complete structure including watermark
+ val namedStreamingRelations = plan.collect {
+ case n: NamedStreamingRelation => n
+ }
+ assert(namedStreamingRelations.size == 1,
+ "Expected 1 NamedStreamingRelation node")
+ assert(namedStreamingRelations.head.sourceIdentifyingName ==
UserProvided("my_source"),
+ "Expected source name 'my_source'")
+
+ // Verify SubqueryAlias is present
+ plan match {
+ case Project(_, SubqueryAlias(aliasId, _)) =>
+ assert(aliasId.name == "src", "Expected alias name 'src'")
+ case other => fail(s"Expected Project with SubqueryAlias but got:
$other")
+ }
+ }
+
+ test("STREAM TVF parenthesized form with alias verifies SubqueryAlias
structure") {
+ // Verify parenthesized form: STREAM(tvf()) IDENTIFIED BY ... AS ...
+ val plan = parsePlan("SELECT * FROM STREAM(range(10)) IDENTIFIED BY
my_source AS src")
+
+ plan match {
+ case Project(_, SubqueryAlias(aliasId, namedStream:
NamedStreamingRelation)) =>
+ assert(aliasId.name == "src", "Expected alias name 'src'")
+ assert(namedStream.sourceIdentifyingName == UserProvided("my_source"),
+ "Expected source name 'my_source'")
+ namedStream.child match {
+ case tvf: UnresolvedTableValuedFunction =>
+ assert(tvf.isStreaming, "TVF should be marked as streaming")
+ case other => fail(s"Expected UnresolvedTableValuedFunction but got:
$other")
+ }
+ case other => fail(s"Expected
Project(SubqueryAlias(NamedStreamingRelation)) but got: $other")
+ }
+ }
+
+ test("IDENTIFIED BY is not allowed for non-streaming TVFs") {
+ val sql = "SELECT * FROM range(10) IDENTIFIED BY my_source"
+ val e = intercept[ParseException] {
+ parsePlan(sql)
+ }
+ assert(e.getMessage.contains("IDENTIFIED BY clause is only supported for
streaming sources"))
+ }
+
+ // ==========================================
+ // Negative tests for IDENTIFIED BY clause
+ // ==========================================
+
+ test("Parse Exception: IDENTIFIED BY without source name") {
+ interceptParseException(parsePlan)(
+ "SELECT * FROM STREAM t IDENTIFIED BY"
+ )(None)
+ }
+
+ test("Parse Exception: Multiple IDENTIFIED BY clauses") {
+ // The grammar should prevent multiple IDENTIFIED BY clauses
+ interceptParseException(parsePlan)(
+ "SELECT * FROM STREAM t IDENTIFIED BY src1 IDENTIFIED BY src2"
+ )(None)
+ }
+
+ test("Parse Exception: IDENTIFIED BY on non-streaming table") {
+ // Regular (non-streaming) tables should not allow IDENTIFIED BY
+ interceptParseException(parsePlan)(
+ "SELECT * FROM t IDENTIFIED BY my_source"
+ )(None)
+ }
+
+ test("Parse Exception: IDENTIFIED BY on subquery") {
+ // Subqueries should not allow IDENTIFIED BY
+ interceptParseException(parsePlan)(
+ "SELECT * FROM (SELECT * FROM t) IDENTIFIED BY my_source"
+ )(None)
+ }
+
+ test("Parse Exception: IDENTIFIED BY before WITH options") {
+ // IDENTIFIED BY should come after WITH options
+ interceptParseException(parsePlan)(
+ "SELECT * FROM STREAM t IDENTIFIED BY src WITH ('key' = 'value')"
+ )(None)
+ }
+
+ test("Parse Exception: IDENTIFIED BY after alias") {
+ // IDENTIFIED BY should come before alias
+ interceptParseException(parsePlan)(
+ "SELECT * FROM STREAM t AS src IDENTIFIED BY my_source"
+ )(None)
+ }
+
+ test("Parse Exception: IDENTIFIED BY after WATERMARK") {
+ // IDENTIFIED BY should come before WATERMARK
+ interceptParseException(parsePlan)(
+ "SELECT * FROM STREAM t WATERMARK col DELAY OF INTERVAL 1 MINUTE
IDENTIFIED BY src"
+ )(None)
+ }
+
+ test("Parse Exception: TVF IDENTIFIED BY after alias") {
+ // IDENTIFIED BY should come before alias for TVFs too
+ interceptParseException(parsePlan)(
+ "SELECT * FROM STREAM range(10) AS t IDENTIFIED BY src"
+ )(None)
+ }
+
+ test("Parse Exception: TVF IDENTIFIED BY after WATERMARK") {
+ // IDENTIFIED BY should come before WATERMARK for TVFs too
+ interceptParseException(parsePlan)(
+ "SELECT * FROM STREAM range(10) WATERMARK col DELAY OF INTERVAL 1 MINUTE
IDENTIFIED BY src"
+ )(None)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]