This is an automated email from the ASF dual-hosted git repository.

dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9df8644a59e9 [SPARK-55029][SS] Propagate streaming source identifying 
name through resolution pipeline
9df8644a59e9 is described below

commit 9df8644a59e98bf869338d6585c56834b846ef20
Author: ericm-db <[email protected]>
AuthorDate: Wed Jan 14 11:47:42 2026 -0800

    [SPARK-55029][SS] Propagate streaming source identifying name through 
resolution pipeline
    
    ### What changes were proposed in this pull request?
    This change enables streaming source identifying name propagation from 
CatalogTable through DataSource to StreamingRelation during table resolution.
    Key changes:
    - Add streamingSourceIdentifyingName field to CatalogTable
    - Update NameStreamingSources to only unwrap when child is resolved
    - Extend getStreamingRelation to accept and propagate sourceIdentifyingName
    - Handle NamedStreamingRelation wrapping 
UnresolvedCatalogRelation/SubqueryAlias
    - Add sourceIdentifyingName to StreamingRelation and 
StreamingExecutionRelation
    - Add tests for streaming table resolution with source naming
    - Update existing test expectations to include sourceIdentifyingName
    
    ### Why are the changes needed?
    
    This infrastructure is needed for stable checkpoint locations and source 
evolution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53792 from ericm-db/streaming-source-naming-apply-oss-diff.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |   4 +-
 .../catalyst/analysis/NameStreamingSources.scala   |   4 +-
 .../spark/sql/catalyst/catalog/interface.scala     |   5 +-
 .../execution/datasources/DataSourceStrategy.scala |  64 ++++++++---
 .../streaming/runtime/MicroBatchExecution.scala    |  12 ++-
 .../streaming/runtime/StreamingRelation.scala      |  23 +++-
 .../execution/streaming/StreamRelationSuite.scala  |   5 +-
 .../sql/streaming/FileStreamSourceSuite.scala      |   6 +-
 .../StreamingSourceIdentifyingNameSuite.scala      | 117 +++++++++++++++++++++
 9 files changed, 212 insertions(+), 28 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index f4bd782617ae..0bca223d09ae 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -125,7 +125,7 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSparkSession with K
 
       val sources: Seq[SparkDataStream] = {
         query.get.logicalPlan.collect {
-          case StreamingExecutionRelation(source: KafkaSource, _, _) => source
+          case StreamingExecutionRelation(source: KafkaSource, _, _, _) => 
source
           case r: StreamingDataSourceV2ScanRelation
             if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
               r.stream.isInstanceOf[KafkaContinuousStream] =>
@@ -1615,7 +1615,7 @@ abstract class KafkaMicroBatchV1SourceSuite extends 
KafkaMicroBatchSourceSuiteBa
       makeSureGetOffsetCalled,
       AssertOnQuery { query =>
         query.logicalPlan.collectFirst {
-          case StreamingExecutionRelation(_: KafkaSource, _, _) => true
+          case StreamingExecutionRelation(_: KafkaSource, _, _, _) => true
         }.nonEmpty
       }
     )
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NameStreamingSources.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NameStreamingSources.scala
index 106d74fcac53..d56edf1d5119 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NameStreamingSources.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NameStreamingSources.scala
@@ -127,8 +127,10 @@ object NameStreamingSources extends Rule[LogicalPlan] {
     } else {
       // Feature disabled - unwrap NamedStreamingRelation nodes without 
propagating names.
       // Error if any source has an explicitly assigned name since the feature 
is disabled.
+      // Only unwrap when child is resolved - this allows FindDataSourceTable 
to resolve
+      // UnresolvedCatalogRelation to StreamingRelation before we unwrap.
       
plan.resolveOperatorsWithPruning(_.containsPattern(NAMED_STREAMING_RELATION)) {
-        case NamedStreamingRelation(child, Unassigned) =>
+        case NamedStreamingRelation(child, Unassigned) if child.resolved =>
           child
         case NamedStreamingRelation(_, UserProvided(name)) =>
           throw 
QueryCompilationErrors.streamingSourceNamingNotSupportedError(name)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 01153d516e5c..7d15551afe0c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Cast, ExprId, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
+import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.catalog.CatalogManager
@@ -442,7 +443,9 @@ case class CatalogTable(
     tracksPartitionsInCatalog: Boolean = false,
     schemaPreservesCase: Boolean = true,
     ignoredProperties: Map[String, String] = Map.empty,
-    viewOriginalText: Option[String] = None) extends MetadataMapSupport {
+    viewOriginalText: Option[String] = None,
+    streamingSourceIdentifyingName: Option[StreamingSourceIdentifyingName] = 
None)
+  extends MetadataMapSupport {
 
   import CatalogTable._
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 93ad3ca3856e..436a760c5b6f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -32,15 +32,16 @@ import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, 
InternalRow, QualifiedTableName, SQLConfHelper}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, 
RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir, 
InsertIntoStatement, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir, 
InsertIntoStatement, LogicalPlan, Project, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
+import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, 
StreamingSourceIdentifyingName, Unassigned}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn, 
PushableExpression, ResolveDefaultColumns}
 import org.apache.spark.sql.classic.{SparkSession, Strategy}
@@ -293,15 +294,20 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
   }
 
   private def getStreamingRelation(
-      table: CatalogTable,
-      extraOptions: CaseInsensitiveStringMap): StreamingRelation = {
-    val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, 
table)
+    table: CatalogTable,
+    extraOptions: CaseInsensitiveStringMap,
+    sourceIdentifyingName: StreamingSourceIdentifyingName
+  ): StreamingRelation = {
+    // Set the source identifying name on the CatalogTable so it propagates to 
StreamingRelation
+    val tableWithSourceName = table.copy(
+      streamingSourceIdentifyingName = Some(sourceIdentifyingName))
+    val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, 
tableWithSourceName)
     val dataSource = DataSource(
       SparkSession.active,
-      className = table.provider.get,
-      userSpecifiedSchema = Some(table.schema),
+      className = tableWithSourceName.provider.get,
+      userSpecifiedSchema = Some(tableWithSourceName.schema),
       options = dsOptions,
-      catalogTable = Some(table))
+      catalogTable = Some(tableWithSourceName))
     StreamingRelation(dataSource)
   }
 
@@ -321,7 +327,10 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
         table.partitionColumnNames.map(name => name -> None).toMap,
         Seq.empty, append.query, false, append.isByName)
 
-    case unresolvedCatalogRelation: UnresolvedCatalogRelation =>
+    // Skip streaming UnresolvedCatalogRelation here - they're handled by the
+    // NamedStreamingRelation case below to preserve the source identifying 
name.
+    case unresolvedCatalogRelation: UnresolvedCatalogRelation
+        if !unresolvedCatalogRelation.isStreaming =>
       val result = resolveUnresolvedCatalogRelation(unresolvedCatalogRelation)
       // We put the resolved relation into the [[AnalyzerBridgeState]] for
       // it to be later reused by the single-pass [[Resolver]] to avoid 
resolving the
@@ -331,11 +340,39 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
       }
       result
 
+    // Handle streaming UnresolvedCatalogRelation wrapped in 
NamedStreamingRelation
+    // to preserve the source identifying name. With resolveOperators 
(bottom-up), the child
+    // is processed first but doesn't match the case above due to the 
!isStreaming guard,
+    // so the NamedStreamingRelation case here can match.
+    // We set the sourceIdentifyingName on the CatalogTable so it propagates 
to StreamingRelation.
+    case NamedStreamingRelation(u: UnresolvedCatalogRelation, 
sourceIdentifyingName) =>
+      val tableWithSourceName = u.tableMeta.copy(
+        streamingSourceIdentifyingName = Some(sourceIdentifyingName))
+      resolveUnresolvedCatalogRelation(u.copy(tableMeta = tableWithSourceName))
+
+    // Handle NamedStreamingRelation wrapping 
SubqueryAlias(UnresolvedCatalogRelation)
+    // - this happens when resolving streaming tables from catalogs where the 
table lookup
+    // creates a SubqueryAlias wrapper around the UnresolvedCatalogRelation.
+    case NamedStreamingRelation(
+        SubqueryAlias(alias, u: UnresolvedCatalogRelation), 
sourceIdentifyingName) =>
+      val tableWithSourceName = u.tableMeta.copy(
+        streamingSourceIdentifyingName = Some(sourceIdentifyingName))
+      val resolved = resolveUnresolvedCatalogRelation(u.copy(tableMeta = 
tableWithSourceName))
+      SubqueryAlias(alias, resolved)
+
+    // Fallback for streaming UnresolvedCatalogRelation that is NOT wrapped in
+    // NamedStreamingRelation (e.g., from .readStream.table() API path).
+    // The sourceIdentifyingName defaults to Unassigned via
+    // tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
+    // in resolveUnresolvedCatalogRelation.
+    case u: UnresolvedCatalogRelation if u.isStreaming =>
+      resolveUnresolvedCatalogRelation(u)
+
     case s @ StreamingRelationV2(
         _, _, table, extraOptions, _, _, _,
-        Some(UnresolvedCatalogRelation(tableMeta, _, true)), _) =>
+        Some(UnresolvedCatalogRelation(tableMeta, _, true)), name) =>
       import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
-      val v1Relation = getStreamingRelation(tableMeta, extraOptions)
+      val v1Relation = getStreamingRelation(tableMeta, extraOptions, name)
       if (table.isInstanceOf[SupportsRead]
           && table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ)) {
         s.copy(v1Relation = Some(v1Relation))
@@ -355,8 +392,11 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
       case UnresolvedCatalogRelation(tableMeta, _, false) =>
         DDLUtils.readHiveTable(tableMeta)
 
+      // For streaming, the sourceIdentifyingName is read from
+      // tableMeta.streamingSourceIdentifyingName which was set by the caller.
       case UnresolvedCatalogRelation(tableMeta, extraOptions, true) =>
-        getStreamingRelation(tableMeta, extraOptions)
+        val sourceIdentifyingName = 
tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
+        getStreamingRelation(tableMeta, extraOptions, sourceIdentifyingName)
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index d81b2276eab7..e11a99653614 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -185,7 +185,8 @@ class MicroBatchExecution(
 
     import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
-      case streamingRelation @ StreamingRelation(dataSourceV1, sourceName, 
output) =>
+      case streamingRelation @ StreamingRelation(
+          dataSourceV1, sourceName, output, sourceIdentifyingName) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
@@ -194,11 +195,12 @@ class MicroBatchExecution(
           logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] 
" +
             log"from DataSourceV1 named 
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, sourceName)}' " +
             log"[${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, 
dataSourceV1)}]")
-          StreamingExecutionRelation(source, output, 
dataSourceV1.catalogTable)(sparkSession)
+          StreamingExecutionRelation(
+            source, output, dataSourceV1.catalogTable, 
sourceIdentifyingName)(sparkSession)
         })
 
       case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, 
output,
-        catalog, identifier, v1, _) =>
+        catalog, identifier, v1, sourceIdentifyingName) =>
         val dsStr = if (src.nonEmpty) s"[${src.get}]" else ""
         val v2Disabled = 
disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName)
         if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
@@ -241,7 +243,7 @@ class MicroBatchExecution(
               log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
             // We don't have a catalog table but may have a table identifier. 
Given this is about
             // v1 fallback path, we just give up and set the catalog table as 
None.
-            StreamingExecutionRelation(source, output, None)(sparkSession)
+            StreamingExecutionRelation(source, output, None, 
sourceIdentifyingName)(sparkSession)
           })
         }
     }
@@ -969,7 +971,7 @@ class MicroBatchExecution(
     // Replace sources in the logical plan with data that has arrived since 
the last batch.
     val newBatchesPlan = logicalPlan transform {
       // For v1 sources.
-      case StreamingExecutionRelation(source, output, catalogTable) =>
+      case StreamingExecutionRelation(source, output, catalogTable, _) =>
         mutableNewData.get(source).map { dataPlan =>
           val hasFileMetadata = output.exists {
             case FileSourceMetadataAttribute(_) => true
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
index 96f6340cb958..84eb9ff77c96 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
@@ -24,6 +24,7 @@ import 
org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, 
LeafNode, LogicalPlan, Statistics}
+import 
org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, 
Unassigned}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -34,8 +35,13 @@ import 
org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
+    // Extract source identifying name from CatalogTable for stable checkpoints
+    val sourceIdentifyingName = dataSource.catalogTable
+      .flatMap(_.streamingSourceIdentifyingName)
+      .getOrElse(Unassigned)
     StreamingRelation(
-      dataSource, dataSource.sourceInfo.name, 
toAttributes(dataSource.sourceInfo.schema))
+      dataSource, dataSource.sourceInfo.name, 
toAttributes(dataSource.sourceInfo.schema),
+      sourceIdentifyingName)
   }
 }
 
@@ -46,11 +52,20 @@ object StreamingRelation {
  * It should be used to create [[Source]] and converted to 
[[StreamingExecutionRelation]] when
  * passing to [[StreamExecution]] to run a query.
  */
-case class StreamingRelation(dataSource: DataSource, sourceName: String, 
output: Seq[Attribute])
+case class StreamingRelation(
+    dataSource: DataSource,
+    sourceName: String,
+    output: Seq[Attribute],
+    sourceIdentifyingName: StreamingSourceIdentifyingName = Unassigned)
   extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns {
   override def isStreaming: Boolean = true
   override def toString: String = sourceName
 
+  // Provide a concise representation for plan comparison output
+  override def simpleString(maxFields: Int): String = {
+    s"StreamingRelation $sourceName, ${output.mkString("[", ", ", "]")}, 
$sourceIdentifyingName"
+  }
+
   // There's no sensible value here. On the execution path, this relation will 
be
   // swapped out with microbatches. But some dataframe operations (in 
particular explain) do lead
   // to this node surviving analysis. So we satisfy the LeafNode contract with 
the session default
@@ -88,7 +103,9 @@ case class StreamingRelation(dataSource: DataSource, 
sourceName: String, output:
 case class StreamingExecutionRelation(
     source: SparkDataStream,
     output: Seq[Attribute],
-    catalogTable: Option[CatalogTable])(session: SparkSession)
+    catalogTable: Option[CatalogTable],
+    sourceIdentifyingName: StreamingSourceIdentifyingName = Unassigned)
+    (session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
   override def otherCopyArgs: Seq[AnyRef] = session :: Nil
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
index e13f592f7078..642aaeb48392 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
@@ -130,6 +130,9 @@ class StreamRelationSuite extends SharedSparkSession with 
AnalysisTest {
     val catalogTable = spark.sessionState.catalog.getTableMetadata(
       TableIdentifier("t")
     )
+    // During streaming resolution, the CatalogTable gets 
streamingSourceIdentifyingName set
+    val catalogTableWithSourceName = catalogTable.copy(
+      streamingSourceIdentifyingName = Some(Unassigned))
     val idAttr = AttributeReference(name = "id", dataType = IntegerType)()
 
     val expectedAnalyzedPlan = Project(
@@ -145,7 +148,7 @@ class StreamRelationSuite extends SharedSparkSession with 
AnalysisTest {
               "path" -> catalogTable.location.toString
             ),
             userSpecifiedSchema = Option(catalogTable.schema),
-            catalogTable = Option(catalogTable)
+            catalogTable = Option(catalogTableWithSourceName)
           ),
           sourceName = s"FileSource[${catalogTable.location.toString}]",
           output = Seq(idAttr)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 15a0f048dd8a..83e6772d69dc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -190,7 +190,7 @@ abstract class FileStreamSourceTest
   protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
     df.queryExecution.analyzed
-      .collect { case StreamingRelation(dataSource, _, _) =>
+      .collect { case StreamingRelation(dataSource, _, _, _) =>
         // There is only one source in our tests so just set sourceId to 0
         
dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
       }.head
@@ -198,7 +198,7 @@ abstract class FileStreamSourceTest
 
   protected def getSourcesFromStreamingQuery(query: StreamExecution): 
Seq[FileStreamSource] = {
     query.logicalPlan.collect {
-      case StreamingExecutionRelation(source, _, _) if 
source.isInstanceOf[FileStreamSource] =>
+      case StreamingExecutionRelation(source, _, _, _) if 
source.isInstanceOf[FileStreamSource] =>
         source.asInstanceOf[FileStreamSource]
     }
   }
@@ -251,7 +251,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         reader.load()
       }
     df.queryExecution.analyzed
-      .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema 
}.head
+      .collect { case s @ StreamingRelation(dataSource, _, _, _) => s.schema 
}.head
   }
 
   override def beforeAll(): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
new file mode 100644
index 000000000000..91c36fcb7935
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.streaming.Unassigned
+import org.apache.spark.sql.execution.streaming.runtime.StreamingRelation
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests for streaming source identifying name propagation through the 
resolution pipeline.
+ * These tests verify that sourceIdentifyingName is correctly propagated from 
CatalogTable
+ * through DataSource to StreamingRelation during streaming table resolution.
+ */
+class StreamingSourceIdentifyingNameSuite extends SharedSparkSession {
+
+  test("STREAM table resolution propagates sourceIdentifyingName through 
pipeline") {
+    withTable("stream_name_test") {
+      sql("CREATE TABLE stream_name_test (id INT) USING PARQUET")
+
+      val analyzedPlan = sql("SELECT * FROM STREAM 
stream_name_test").queryExecution.analyzed
+
+      val streamingRelation = analyzedPlan.collectFirst {
+        case sr: StreamingRelation => sr
+      }
+
+      assert(streamingRelation.isDefined, "Expected StreamingRelation in 
analyzed plan")
+      assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
+        s"Expected Unassigned but got 
${streamingRelation.get.sourceIdentifyingName}")
+
+      // Verify the CatalogTable in DataSource has the name set
+      val catalogTable = streamingRelation.get.dataSource.catalogTable
+      assert(catalogTable.isDefined, "Expected CatalogTable in DataSource")
+      assert(catalogTable.get.streamingSourceIdentifyingName == 
Some(Unassigned),
+        s"Expected Some(Unassigned) but got 
${catalogTable.get.streamingSourceIdentifyingName}")
+    }
+  }
+
+  test("STREAM with qualified name propagates sourceIdentifyingName through 
SubqueryAlias") {
+    // When querying a table with a qualified name (e.g., 
spark_catalog.default.t),
+    // the catalog lookup creates a SubqueryAlias wrapper.
+    withTable("stream_alias_test") {
+      sql("CREATE TABLE stream_alias_test (id INT, data STRING) USING PARQUET")
+
+      // Query using fully qualified name to ensure SubqueryAlias is created
+      val analyzedPlan = sql(
+        "SELECT * FROM STREAM spark_catalog.default.stream_alias_test"
+      ).queryExecution.analyzed
+
+      val streamingRelation = analyzedPlan.collectFirst {
+        case sr: StreamingRelation => sr
+      }
+
+      assert(streamingRelation.isDefined, "Expected StreamingRelation in 
analyzed plan")
+      assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
+        s"Expected Unassigned but got 
${streamingRelation.get.sourceIdentifyingName}")
+
+      // Verify the plan has SubqueryAlias wrapping StreamingRelation
+      val hasSubqueryAlias = analyzedPlan.collect {
+        case SubqueryAlias(_, _: StreamingRelation) => true
+      }.nonEmpty
+      assert(hasSubqueryAlias, "Expected SubqueryAlias wrapping 
StreamingRelation")
+    }
+  }
+
+  test("readStream.table() API resolves streaming table correctly") {
+    // This tests the fallback case in FindDataSourceTable for streaming
+    // UnresolvedCatalogRelation that is NOT wrapped in NamedStreamingRelation.
+    withTable("api_stream_test") {
+      sql("CREATE TABLE api_stream_test (id INT) USING PARQUET")
+
+      val df = spark.readStream.table("api_stream_test")
+      val analyzedPlan = df.queryExecution.analyzed
+
+      val streamingRelation = analyzedPlan.collectFirst {
+        case sr: StreamingRelation => sr
+      }
+
+      assert(streamingRelation.isDefined, "Expected StreamingRelation in 
analyzed plan")
+      assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
+        s"Expected Unassigned but got 
${streamingRelation.get.sourceIdentifyingName}")
+    }
+  }
+
+  test("batch table query still resolves correctly (regression test)") {
+    // Verify that the !isStreaming guard in FindDataSourceTable
+    // doesn't break normal batch table resolution.
+    withTable("batch_test") {
+      sql("CREATE TABLE batch_test (id INT, value STRING) USING PARQUET")
+      sql("INSERT INTO batch_test VALUES (1, 'a'), (2, 'b')")
+
+      val result = sql("SELECT * FROM batch_test").collect()
+      assert(result.length == 2, s"Expected 2 rows but got ${result.length}")
+
+      val analyzedPlan = sql("SELECT * FROM 
batch_test").queryExecution.analyzed
+      val hasStreamingRelation = analyzedPlan.collect {
+        case _: StreamingRelation => true
+      }.nonEmpty
+      assert(!hasStreamingRelation, "Batch query should not contain 
StreamingRelation")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to