This is an automated email from the ASF dual-hosted git repository.

dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 195c7dc9d2a2 [SPARK-55054][SS][SQL] Add IDENTIFIED BY support for 
streaming table-valued functions
195c7dc9d2a2 is described below

commit 195c7dc9d2a2407268f86b7bb79190b33c2f4e71
Author: ericm-db <[email protected]>
AuthorDate: Fri Jan 16 11:53:56 2026 -0800

    [SPARK-55054][SS][SQL] Add IDENTIFIED BY support for streaming table-valued 
functions
    
    ### What changes were proposed in this pull request?
    
    This PR extends the IDENTIFIED BY syntax to support streaming table-valued 
functions (TVFs), complementing the existing support for streaming tables.
    
    The changes include:
    - Added grammar rules for streaming TVFs with IDENTIFIED BY clause
    - Split `functionTable` into `tableFunctionCall` + clauses for consistent 
syntax
    - Added validation that IDENTIFIED BY is only allowed for streaming sources
    - Streaming TVFs support two syntaxes:
      1. `STREAM tvf() IDENTIFIED BY name` - clauses inside
      2. `STREAM(tvf()) IDENTIFIED BY name` - clauses outside (consistent with 
table syntax)
    - Added comprehensive test coverage for both syntaxes
    
    ### Why are the changes needed?
    
    Streaming TVFs (like range(), read_files() when available) need the ability 
to be named just like streaming tables. This ensures:
    - Consistent source naming across all streaming source types
    - Better observability for streaming queries using TVFs
    - Proper checkpoint management for TVF-based streams
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Users can now use IDENTIFIED BY with streaming TVFs:
    
    ```sql
    -- Non-parenthesized form
    SELECT * FROM STREAM range(100) IDENTIFIED BY my_range_source
    
    -- Parenthesized form (clauses outside for consistency)
    SELECT * FROM STREAM(range(100)) IDENTIFIED BY my_range_source
    
    -- With watermark and alias
    SELECT * FROM STREAM range(100)
      IDENTIFIED BY my_source
      WATERMARK ts DELAY OF INTERVAL 1 MINUTE
      AS src
    ```
    
    Using IDENTIFIED BY on non-streaming TVFs produces a clear error:
    ```
    IDENTIFIED BY clause is only supported for streaming sources
    ```
    
    ### How was this patch tested?
    
    - Added comprehensive tests in StreamRelationParserSuite covering:
      - Non-parenthesized form with all clause combinations
      - Parenthesized form with all clause combinations
      - Validation that non-streaming TVFs reject IDENTIFIED BY
    - Tests verify correct placement of clauses in both syntaxes
    - All existing tests continue to pass
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53819 from ericm-db/streaming-tvf-identified-by.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  21 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  89 ++++--
 .../parser/StreamRelationParserSuite.scala         | 311 ++++++++++++++++++++-
 3 files changed, 398 insertions(+), 23 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index a7b3efda2b81..b53af82a3d2f 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -429,6 +429,9 @@ streamRelationPrimary
     | STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN
       optionsClause? identifiedByClause?
       watermarkClause? tableAlias                                      
#streamTableName
+    | STREAM tableFunctionCallWithTrailingClauses                      
#streamTableValuedFunction
+    | STREAM LEFT_PAREN tableFunctionCall RIGHT_PAREN
+      identifiedByClause? watermarkClause? tableAlias                  
#streamTableValuedFunction
     ;
 
 setResetStatement
@@ -1068,7 +1071,7 @@ relationPrimary
     | LEFT_PAREN relation RIGHT_PAREN sample?
        watermarkClause? tableAlias                          #aliasedRelation
     | inlineTable                                           
#inlineTableDefault2
-    | functionTable                                         
#tableValuedFunction
+    | tableFunctionCallWithTrailingClauses                  
#tableValuedFunction
     ;
 
 optionsClause
@@ -1118,13 +1121,19 @@ functionTableArgument
     | functionArgument
     ;
 
-// This is only used in relationPrimary where having watermarkClause makes 
sense. If this becomes
-// referred by other clause, please check wheter watermarkClause makes sense 
to the clause.
-// If not, consider separate this rule.
-functionTable
+// A table function call including opening and closing parentheses.
+tableFunctionCall
     : funcName=functionName LEFT_PAREN
       (functionTableArgument (COMMA functionTableArgument)*)?
-      RIGHT_PAREN watermarkClause? tableAlias
+      RIGHT_PAREN
+    ;
+
+// A table function call with optional trailing clauses for streaming and 
aliasing.
+// The identifiedByClause is optional and only valid for streaming TVFs. For 
non-streaming TVFs,
+// the AST builder will reject it with an error. The clause must come before 
watermarkClause
+// and tableAlias to avoid ambiguity (since IDENTIFIED is a nonReserved 
keyword).
+tableFunctionCallWithTrailingClauses
+    : tableFunctionCall identifiedByClause? watermarkClause? tableAlias
     ;
 
 tableAlias
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 71d919420ca8..46d7171cce5a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2431,26 +2431,30 @@ class AstBuilder extends DataTypeAstBuilder
   }
 
   /**
-   * Create a table-valued function call with arguments, e.g. range(1000)
-   */
-  override def visitTableValuedFunction(ctx: TableValuedFunctionContext)
-      : LogicalPlan = withOrigin(ctx) {
-    val func = ctx.functionTable
-    val aliases = if (func.tableAlias.identifierList != null) {
-      visitIdentifierList(func.tableAlias.identifierList)
+   * Build an UnresolvedTableValuedFunction from a tableFunctionCall context.
+   * Used by both visitTableValuedFunction and stream TVF visitors.
+   */
+  private def buildTvfFromTableFunctionCall(
+      funcCallCtx: TableFunctionCallContext,
+      tableAliasCtx: TableAliasContext,
+      watermarkClauseCtx: WatermarkClauseContext): LogicalPlan = {
+    val aliases = if (tableAliasCtx != null && tableAliasCtx.identifierList != 
null) {
+      visitIdentifierList(tableAliasCtx.identifierList)
     } else {
       Seq.empty
     }
-
     withFuncIdentClause(
-      func.functionName,
+      funcCallCtx.funcName,
       Nil,
       (ident, _) => {
         if (ident.length > 1) {
-          throw QueryParsingErrors.invalidTableValuedFunctionNameError(ident, 
ctx)
+          throw new ParseException(
+            errorClass = "INVALID_SQL_SYNTAX.INVALID_TABLE_VALUED_FUNC_NAME",
+            messageParameters = Map("funcName" -> toSQLId(ident)),
+            ctx = funcCallCtx)
         }
-        val funcName = func.functionName.getText
-        val args = func.functionTableArgument.asScala.map { e =>
+        val funcName = funcCallCtx.funcName.getText
+        val args = funcCallCtx.functionTableArgument.asScala.map { e =>
           Option(e.functionArgument).map(extractNamedArgument(_, funcName))
             .getOrElse {
               
extractFunctionTableNamedArgument(e.functionTableReferenceArgument, funcName)
@@ -2460,11 +2464,34 @@ class AstBuilder extends DataTypeAstBuilder
         val tvf = UnresolvedTableValuedFunction(ident, args)
 
         val tvfAliases = if (aliases.nonEmpty) UnresolvedTVFAliases(ident, 
tvf, aliases) else tvf
+        val tvfWithWatermark = 
tvfAliases.optionalMap(watermarkClauseCtx)(withWatermark)
+        Option(tableAliasCtx).map { c =>
+          tvfWithWatermark.optionalMap(c.strictIdentifier)(aliasPlan)
+        }.getOrElse {
+          tvfWithWatermark
+        }
+      }
+    )
+  }
 
-        val watermarkClause = func.watermarkClause()
-        val tvfWithWatermark = 
tvfAliases.optionalMap(watermarkClause)(withWatermark)
-        
tvfWithWatermark.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan)
-      })
+  /**
+   * Create a table-valued function call with arguments, e.g. range(1000)
+   */
+  override def visitTableValuedFunction(ctx: TableValuedFunctionContext)
+      : LogicalPlan = withOrigin(ctx) {
+    // IDENTIFIED BY is only valid for streaming TVFs
+    if (ctx.tableFunctionCallWithTrailingClauses.identifiedByClause != null) {
+      operationNotAllowed("IDENTIFIED BY clause is only supported for 
streaming sources", ctx)
+    }
+    
visitTableFunctionCallWithTrailingClauses(ctx.tableFunctionCallWithTrailingClauses)
+  }
+
+  /**
+   * Create a table-valued function call with optional trailing clauses.
+   */
+  override def visitTableFunctionCallWithTrailingClauses(
+      ctx: TableFunctionCallWithTrailingClausesContext): LogicalPlan = 
withOrigin(ctx) {
+    buildTvfFromTableFunctionCall(ctx.tableFunctionCall, ctx.tableAlias, 
ctx.watermarkClause)
   }
 
   /**
@@ -2493,6 +2520,36 @@ class AstBuilder extends DataTypeAstBuilder
     }
   }
 
+  /**
+   * Create a logical plan for a stream TVF.
+   * Handles two forms:
+   * 1. STREAM tableFunctionCallWithTrailingClauses - clauses are inside
+   * 2. STREAM(tableFunctionCall) clauses - clauses are outside STREAM() for 
consistency with
+   *    table names
+   */
+  override def visitStreamTableValuedFunction(ctx: 
StreamTableValuedFunctionContext): LogicalPlan =
+    withOrigin(ctx) {
+      Option(ctx.tableFunctionCallWithTrailingClauses).map { funcTable =>
+        // Form: STREAM tableFunctionCallWithTrailingClauses
+        val sourceName = extractSourceName(funcTable.identifiedByClause)
+        val tvfPlan = buildTvfFromTableFunctionCall(
+          funcTable.tableFunctionCall, funcTable.tableAlias, 
funcTable.watermarkClause)
+        tvfPlan.transformUp {
+          case tvf: UnresolvedTableValuedFunction =>
+            NamedStreamingRelation.withUserProvidedName(tvf.copy(isStreaming = 
true), sourceName)
+        }
+      }.getOrElse {
+        // Form: STREAM(tableFunctionCall) identifiedByClause? 
watermarkClause? tableAlias
+        val sourceName = extractSourceName(ctx.identifiedByClause)
+        val tvfPlan = buildTvfFromTableFunctionCall(
+          ctx.tableFunctionCall, ctx.tableAlias, ctx.watermarkClause)
+        tvfPlan.transformUp {
+          case tvf: UnresolvedTableValuedFunction =>
+            NamedStreamingRelation.withUserProvidedName(tvf.copy(isStreaming = 
true), sourceName)
+        }
+      }
+    }
+
   /**
    * Create an inline table (a virtual table in Hive parlance).
    */
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
index 3c8e8152eca5..000104259c80 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.catalyst.AliasIdentifier
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, 
NamedStreamingRelation, UnresolvedRelation, UnresolvedStar}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, 
NamedStreamingRelation, UnresolvedRelation, UnresolvedStar, 
UnresolvedTableValuedFunction}
 import org.apache.spark.sql.catalyst.plans.logical.{Project, SubqueryAlias}
 import org.apache.spark.sql.catalyst.streaming.{Unassigned, UserProvided}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -323,4 +323,313 @@ class StreamRelationParserSuite extends AnalysisTest {
     assert(namedStreamingRelations.size == 1)
     assert(namedStreamingRelations.head.sourceIdentifyingName == 
UserProvided("my-source-name"))
   }
+
+  // =============================================================
+  // Comprehensive tests for all clause combinations with tables
+  // =============================================================
+
+  test("STREAM table non-parenthesized form with various clause combinations") 
{
+    // Non-parenthesized form: STREAM table clauses
+    val wm = "WATERMARK col DELAY OF INTERVAL 1 MINUTE"
+    val testCases = Seq(
+      // (query, expectedSourceName)
+      ("SELECT * FROM STREAM t AS src", Unassigned),
+      ("SELECT * FROM STREAM t IDENTIFIED BY src1", UserProvided("src1")),
+      ("SELECT * FROM STREAM t IDENTIFIED BY src1 AS tbl", 
UserProvided("src1")),
+      (s"SELECT * FROM STREAM t $wm", Unassigned),
+      (s"SELECT * FROM STREAM t $wm AS tbl", Unassigned),
+      (s"SELECT * FROM STREAM t IDENTIFIED BY src1 $wm", UserProvided("src1")),
+      (s"SELECT * FROM STREAM t IDENTIFIED BY src1 $wm AS tbl", 
UserProvided("src1"))
+    )
+
+    testCases.foreach { case (query, expectedSourceName) =>
+      val plan = parsePlan(query)
+      assert(plan.isStreaming, s"Expected streaming plan for: $query")
+      val namedStreamingRelations = plan.collect {
+        case n: NamedStreamingRelation => n
+      }
+      assert(namedStreamingRelations.size == 1,
+        s"Expected 1 NamedStreamingRelation node in: $query")
+      assert(namedStreamingRelations.head.sourceIdentifyingName == 
expectedSourceName,
+        s"Expected source name $expectedSourceName in: $query")
+    }
+  }
+
+  test("STREAM table parenthesized form with various clause combinations") {
+    // Parenthesized form: STREAM(table) clauses
+    val wm = "WATERMARK col DELAY OF INTERVAL 1 MINUTE"
+    val testCases = Seq(
+      // (query, expectedSourceName)
+      ("SELECT * FROM STREAM(t) AS src", Unassigned),
+      ("SELECT * FROM STREAM(t) IDENTIFIED BY src1", UserProvided("src1")),
+      ("SELECT * FROM STREAM(t) IDENTIFIED BY src1 AS tbl", 
UserProvided("src1")),
+      (s"SELECT * FROM STREAM(t) $wm", Unassigned),
+      (s"SELECT * FROM STREAM(t) $wm AS tbl", Unassigned),
+      (s"SELECT * FROM STREAM(t) IDENTIFIED BY src1 $wm", 
UserProvided("src1")),
+      (s"SELECT * FROM STREAM(t) IDENTIFIED BY src1 $wm AS tbl", 
UserProvided("src1"))
+    )
+
+    testCases.foreach { case (query, expectedSourceName) =>
+      val plan = parsePlan(query)
+      assert(plan.isStreaming, s"Expected streaming plan for: $query")
+      val namedStreamingRelations = plan.collect {
+        case n: NamedStreamingRelation => n
+      }
+      assert(namedStreamingRelations.size == 1,
+        s"Expected 1 NamedStreamingRelation node in: $query")
+      assert(namedStreamingRelations.head.sourceIdentifyingName == 
expectedSourceName,
+        s"Expected source name $expectedSourceName in: $query")
+    }
+  }
+
+  test("STREAM table with IDENTIFIED BY and alias verifies SubqueryAlias 
structure") {
+    // Verify that SubqueryAlias wraps NamedStreamingRelation when both are 
present
+    comparePlans(
+      parsePlan("SELECT * FROM STREAM t IDENTIFIED BY my_source AS src"),
+      Project(
+        projectList = Seq(UnresolvedStar(None)),
+        child = SubqueryAlias(
+          identifier = AliasIdentifier(
+            name = "src",
+            qualifier = Seq.empty
+          ),
+          child = NamedStreamingRelation(
+            child = UnresolvedRelation(
+              multipartIdentifier = Seq("t"),
+              isStreaming = true
+            ),
+            sourceIdentifyingName = UserProvided("my_source")
+          )
+        )
+      )
+    )
+  }
+
+  test("STREAM table with options, IDENTIFIED BY, and alias verifies plan 
structure") {
+    // Verify complete plan structure with all clauses
+    comparePlans(
+      parsePlan("SELECT * FROM STREAM t WITH ('key'='value') IDENTIFIED BY 
my_source AS src"),
+      Project(
+        projectList = Seq(UnresolvedStar(None)),
+        child = SubqueryAlias(
+          identifier = AliasIdentifier(
+            name = "src",
+            qualifier = Seq.empty
+          ),
+          child = NamedStreamingRelation(
+            child = UnresolvedRelation(
+              multipartIdentifier = Seq("t"),
+              options = new CaseInsensitiveStringMap(Map("key" -> 
"value").asJava),
+              isStreaming = true
+            ),
+            sourceIdentifyingName = UserProvided("my_source")
+          )
+        )
+      )
+    )
+  }
+
+  // ===================================
+  // IDENTIFIED BY syntax tests for TVFs
+  // ===================================
+
+  test("STREAM TVF non-parenthesized form with various clause combinations") {
+    // Non-parenthesized form: STREAM tvf() clauses
+    val wm = "WATERMARK col DELAY OF INTERVAL 1 MINUTE"
+    val testCases = Seq(
+      // (query, expectedSourceName)
+      ("SELECT * FROM STREAM range(10) AS src", Unassigned),
+      ("SELECT * FROM STREAM range(10) IDENTIFIED BY src1", 
UserProvided("src1")),
+      ("SELECT * FROM STREAM range(10) IDENTIFIED BY src1 AS t", 
UserProvided("src1")),
+      (s"SELECT * FROM STREAM range(10) $wm", Unassigned),
+      (s"SELECT * FROM STREAM range(10) $wm AS t", Unassigned),
+      (s"SELECT * FROM STREAM range(10) IDENTIFIED BY src1 $wm", 
UserProvided("src1")),
+      (s"SELECT * FROM STREAM range(10) IDENTIFIED BY src1 $wm AS t", 
UserProvided("src1"))
+    )
+
+    testCases.foreach { case (query, expectedSourceName) =>
+      val plan = parsePlan(query)
+      val namedStreamingRelations = plan.collect {
+        case n: NamedStreamingRelation => n
+      }
+      assert(namedStreamingRelations.size == 1,
+        s"Expected 1 NamedStreamingRelation node in: $query")
+      assert(namedStreamingRelations.head.sourceIdentifyingName == 
expectedSourceName,
+        s"Expected source name $expectedSourceName in: $query")
+    }
+
+    // Also verify the underlying TVF is streaming
+    val plan = parsePlan("SELECT * FROM STREAM range(10) IDENTIFIED BY src1")
+    plan.collectFirst { case n: NamedStreamingRelation => n }.get.child match {
+      case u: UnresolvedTableValuedFunction =>
+        assert(u.isStreaming, "TVF should be marked as streaming")
+      case _ => fail("Expected UnresolvedTableValuedFunction as child")
+    }
+  }
+
+  test("STREAM TVF parenthesized form with various clause combinations") {
+    // Parenthesized form: STREAM(tvf()) clauses - clauses OUTSIDE parentheses
+    // This is consistent with table syntax: STREAM(table) IDENTIFIED BY ...
+    val wm = "WATERMARK col DELAY OF INTERVAL 1 MINUTE"
+    val testCases = Seq(
+      // (query, expectedSourceName)
+      ("SELECT * FROM STREAM(range(10)) AS src", Unassigned),
+      ("SELECT * FROM STREAM(range(10)) IDENTIFIED BY src1", 
UserProvided("src1")),
+      ("SELECT * FROM STREAM(range(10)) IDENTIFIED BY src1 AS t", 
UserProvided("src1")),
+      (s"SELECT * FROM STREAM(range(10)) $wm", Unassigned),
+      (s"SELECT * FROM STREAM(range(10)) $wm AS t", Unassigned),
+      (s"SELECT * FROM STREAM(range(10)) IDENTIFIED BY src1 $wm", 
UserProvided("src1")),
+      (s"SELECT * FROM STREAM(range(10)) IDENTIFIED BY src1 $wm AS t", 
UserProvided("src1"))
+    )
+
+    testCases.foreach { case (query, expectedSourceName) =>
+      val plan = parsePlan(query)
+      val namedStreamingRelations = plan.collect {
+        case n: NamedStreamingRelation => n
+      }
+      assert(namedStreamingRelations.size == 1,
+        s"Expected 1 NamedStreamingRelation node in: $query")
+      assert(namedStreamingRelations.head.sourceIdentifyingName == 
expectedSourceName,
+        s"Expected source name $expectedSourceName in: $query")
+    }
+  }
+
+  test("STREAM TVF with IDENTIFIED BY and alias verifies SubqueryAlias 
structure") {
+    // Verify that SubqueryAlias wraps NamedStreamingRelation when both are 
present
+    val plan = parsePlan("SELECT * FROM STREAM range(10) IDENTIFIED BY 
my_source AS src")
+
+    // Check that we have a SubqueryAlias wrapping NamedStreamingRelation
+    plan match {
+      case Project(_, SubqueryAlias(aliasId, namedStream: 
NamedStreamingRelation)) =>
+        assert(aliasId.name == "src", "Expected alias name 'src'")
+        assert(namedStream.sourceIdentifyingName == UserProvided("my_source"),
+          "Expected source name 'my_source'")
+        namedStream.child match {
+          case tvf: UnresolvedTableValuedFunction =>
+            assert(tvf.isStreaming, "TVF should be marked as streaming")
+            assert(tvf.name == Seq("range"), "Expected 'range' TVF")
+          case other => fail(s"Expected UnresolvedTableValuedFunction but got: 
$other")
+        }
+      case other => fail(s"Expected 
Project(SubqueryAlias(NamedStreamingRelation)) but got: $other")
+    }
+  }
+
+  test("STREAM TVF with all clauses verifies complete plan structure") {
+    // Verify the complete plan structure with IDENTIFIED BY, WATERMARK, and 
alias
+    // The structure should be:
+    // Project -> SubqueryAlias -> UnresolvedEventTimeWatermark -> 
NamedStreamingRelation -> TVF
+    val query = """SELECT * FROM STREAM range(10)
+                  |  IDENTIFIED BY my_source
+                  |  WATERMARK col DELAY OF INTERVAL 1 MINUTE
+                  |  AS src""".stripMargin
+    val plan = parsePlan(query)
+
+    // Verify the complete structure including watermark
+    val namedStreamingRelations = plan.collect {
+      case n: NamedStreamingRelation => n
+    }
+    assert(namedStreamingRelations.size == 1,
+      "Expected 1 NamedStreamingRelation node")
+    assert(namedStreamingRelations.head.sourceIdentifyingName == 
UserProvided("my_source"),
+      "Expected source name 'my_source'")
+
+    // Verify SubqueryAlias is present
+    plan match {
+      case Project(_, SubqueryAlias(aliasId, _)) =>
+        assert(aliasId.name == "src", "Expected alias name 'src'")
+      case other => fail(s"Expected Project with SubqueryAlias but got: 
$other")
+    }
+  }
+
+  test("STREAM TVF parenthesized form with alias verifies SubqueryAlias 
structure") {
+    // Verify parenthesized form: STREAM(tvf()) IDENTIFIED BY ... AS ...
+    val plan = parsePlan("SELECT * FROM STREAM(range(10)) IDENTIFIED BY 
my_source AS src")
+
+    plan match {
+      case Project(_, SubqueryAlias(aliasId, namedStream: 
NamedStreamingRelation)) =>
+        assert(aliasId.name == "src", "Expected alias name 'src'")
+        assert(namedStream.sourceIdentifyingName == UserProvided("my_source"),
+          "Expected source name 'my_source'")
+        namedStream.child match {
+          case tvf: UnresolvedTableValuedFunction =>
+            assert(tvf.isStreaming, "TVF should be marked as streaming")
+          case other => fail(s"Expected UnresolvedTableValuedFunction but got: 
$other")
+        }
+      case other => fail(s"Expected 
Project(SubqueryAlias(NamedStreamingRelation)) but got: $other")
+    }
+  }
+
+  test("IDENTIFIED BY is not allowed for non-streaming TVFs") {
+    val sql = "SELECT * FROM range(10) IDENTIFIED BY my_source"
+    val e = intercept[ParseException] {
+      parsePlan(sql)
+    }
+    assert(e.getMessage.contains("IDENTIFIED BY clause is only supported for 
streaming sources"))
+  }
+
+  // ==========================================
+  // Negative tests for IDENTIFIED BY clause
+  // ==========================================
+
+  test("Parse Exception: IDENTIFIED BY without source name") {
+    interceptParseException(parsePlan)(
+      "SELECT * FROM STREAM t IDENTIFIED BY"
+    )(None)
+  }
+
+  test("Parse Exception: Multiple IDENTIFIED BY clauses") {
+    // The grammar should prevent multiple IDENTIFIED BY clauses
+    interceptParseException(parsePlan)(
+      "SELECT * FROM STREAM t IDENTIFIED BY src1 IDENTIFIED BY src2"
+    )(None)
+  }
+
+  test("Parse Exception: IDENTIFIED BY on non-streaming table") {
+    // Regular (non-streaming) tables should not allow IDENTIFIED BY
+    interceptParseException(parsePlan)(
+      "SELECT * FROM t IDENTIFIED BY my_source"
+    )(None)
+  }
+
+  test("Parse Exception: IDENTIFIED BY on subquery") {
+    // Subqueries should not allow IDENTIFIED BY
+    interceptParseException(parsePlan)(
+      "SELECT * FROM (SELECT * FROM t) IDENTIFIED BY my_source"
+    )(None)
+  }
+
+  test("Parse Exception: IDENTIFIED BY before WITH options") {
+    // IDENTIFIED BY should come after WITH options
+    interceptParseException(parsePlan)(
+      "SELECT * FROM STREAM t IDENTIFIED BY src WITH ('key' = 'value')"
+    )(None)
+  }
+
+  test("Parse Exception: IDENTIFIED BY after alias") {
+    // IDENTIFIED BY should come before alias
+    interceptParseException(parsePlan)(
+      "SELECT * FROM STREAM t AS src IDENTIFIED BY my_source"
+    )(None)
+  }
+
+  test("Parse Exception: IDENTIFIED BY after WATERMARK") {
+    // IDENTIFIED BY should come before WATERMARK
+    interceptParseException(parsePlan)(
+      "SELECT * FROM STREAM t WATERMARK col DELAY OF INTERVAL 1 MINUTE 
IDENTIFIED BY src"
+    )(None)
+  }
+
+  test("Parse Exception: TVF IDENTIFIED BY after alias") {
+    // IDENTIFIED BY should come before alias for TVFs too
+    interceptParseException(parsePlan)(
+      "SELECT * FROM STREAM range(10) AS t IDENTIFIED BY src"
+    )(None)
+  }
+
+  test("Parse Exception: TVF IDENTIFIED BY after WATERMARK") {
+    // IDENTIFIED BY should come before WATERMARK for TVFs too
+    interceptParseException(parsePlan)(
+      "SELECT * FROM STREAM range(10) WATERMARK col DELAY OF INTERVAL 1 MINUTE 
IDENTIFIED BY src"
+    )(None)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to