This is an automated email from the ASF dual-hosted git repository.

HeartSaVioR pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 05b4d81f3f93 [SPARK-56975][SS] Reject user-specified schema in 
DataStreamReader.table()
05b4d81f3f93 is described below

commit 05b4d81f3f938ff140886d6f66ad66d08c66d5b2
Author: You Zhou <[email protected]>
AuthorDate: Thu May 21 11:57:52 2026 +0900

    [SPARK-56975][SS] Reject user-specified schema in DataStreamReader.table()
    
    ### What changes were proposed in this pull request?
    
    Make `DataStreamReader.table()` reject user-specified schemas by calling 
`assertNoSpecifiedSchema("table")`, mirroring `DataStreamReader.changes()`.
    
    ### Why are the changes needed?
    
    `DataStreamReader.table()` accepts a user-specified schema without 
complaint and then silently ignores it:
    
    ```scala
    spark.readStream
      .schema(new StructType().add("a", IntegerType))
      .table("some_table")     // no error; the schema has no effect
    ```
    
    User-specified schema is not a meaningful input to `.table()` — catalog 
tables declare their own schema, and `TableCatalog.loadTable(Identifier)` has 
no parameter to receive a user schema, so even if Spark wanted to forward one 
it couldn't. The user's `.schema(...)` call is therefore always a 
misconfiguration.
    
    The rest of `DataStreamReader` already surfaces this kind of 
misconfiguration as a clear error:
    
    - `.load()` goes through `DataSourceV2Utils.getTableFromProvider`, which 
throws `_LEGACY_ERROR_TEMP_2242` ("`<provider>` source does not support 
user-specified schema") when the provider does not implement 
`supportsExternalMetadata()`.
    - `.changes()` explicitly calls `assertNoSpecifiedSchema("changes")` and 
throws `_LEGACY_ERROR_TEMP_1189` ("User specified schema not supported with 
`changes`.").
    
    `.table()` is the odd one out: same invalid configuration, no error. Users 
can write `readStream.schema(s).table(name)`, see a working query, and 
reasonably assume `s` had an effect — when in fact the resulting stream uses 
the catalog schema and `s` was dropped. Surfacing this as a clear error aligns 
`.table()` with the existing behavior of `.load()` and `.changes()`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added `DataStreamTableAPISuite` test `"read: user-specified schema is not 
allowed with table API"`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #56017 from PorridgeSwim/forbidSpecifySchemaForTable.
    
    Lead-authored-by: You Zhou <[email protected]>
    Co-authored-by: You Zhou <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/sql/classic/DataStreamReader.scala  |  1 +
 .../sql/streaming/test/DataStreamTableAPISuite.scala     | 16 ++++++++++++++++
 2 files changed, 17 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
index a3ab235372d8..ef7cebdb2a19 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
@@ -102,6 +102,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession)
   /** @inheritdoc */
   def table(tableName: String): DataFrame = {
     require(tableName != null, "The table name can't be null")
+    assertNoSpecifiedSchema("table")
     val identifier = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
     val unresolved = UnresolvedRelation(
       identifier,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index dab667731019..f10d1cdab0d5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -83,6 +83,22 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
     checkErrorTableNotFound(e, "`non_exist_table`")
   }
 
+  test("read: user-specified schema is not allowed with table API") {
+    val tblName = "my_table"
+    withTable(tblName) {
+      spark.range(3).write.format("parquet").saveAsTable(tblName)
+      val e = intercept[AnalysisException] {
+        spark.readStream
+          .schema(new StructType().add("a", IntegerType))
+          .table(tblName)
+      }
+      checkError(
+        exception = e,
+        condition = "_LEGACY_ERROR_TEMP_1189",
+        parameters = Map("operation" -> "table"))
+    }
+  }
+
   test("read: stream table API with temp view") {
     val tblName = "my_table"
     val stream = MemoryStream[Int]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to