This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 605305b418e1 [SPARK-55402][SS] Move streamingSourceIdentifyingName 
from CatalogTable to DataSource
605305b418e1 is described below

commit 605305b418e14feb86cbb782399a0c711132fd40
Author: ericm-db <[email protected]>
AuthorDate: Mon Feb 9 11:16:41 2026 -0800

    [SPARK-55402][SS] Move streamingSourceIdentifyingName from CatalogTable to 
DataSource
    
    ### What changes were proposed in this pull request?
    This PR refactors the streamingSourceIdentifyingName field to move it from 
CatalogTable into DataSource as a constructor parameter. The changes include:
    1. DataSource.scala: Added streamingSourceIdentifyingName as an optional 
constructor parameter
    2. StreamingRelation.scala: Updated StreamingRelation.apply() to read the 
name from DataSource instead of CatalogTable
    3. DataSourceStrategy.scala: Modified 
FindDataSourceTable.getStreamingRelation() to pass the name directly to 
DataSource constructor instead of copying it onto CatalogTable
    4. CatalogTable (interface.scala): Removed the 
streamingSourceIdentifyingName field entirely
    5. Test updates: Updated test files to reflect the new architecture
    
    ### Why are the changes needed?
     streamingSourceIdentifyingName represents query-specific metadata (which 
source name was assigned in a particular streaming query plan), not an 
intrinsic property of the table itself
    By moving this field to DataSource (which is already query-specific), we 
restore proper catalog table equality while maintaining the ability to track 
streaming source identifying names for stable checkpoints.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.6
    
    Closes #54185 from ericm-db/source-name-ds.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../spark/sql/catalyst/catalog/interface.scala     |  4 +--
 .../sql/execution/datasources/DataSource.scala     |  5 ++-
 .../execution/datasources/DataSourceStrategy.scala | 36 +++++++++-------------
 .../streaming/runtime/StreamingRelation.scala      |  6 ++--
 .../execution/streaming/StreamRelationSuite.scala  |  6 ++--
 .../StreamingSourceIdentifyingNameSuite.scala      |  9 +++---
 6 files changed, 27 insertions(+), 39 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 7d15551afe0c..fcaea4709504 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -41,7 +41,6 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Cast, ExprId, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.catalog.CatalogManager
@@ -443,8 +442,7 @@ case class CatalogTable(
     tracksPartitionsInCatalog: Boolean = false,
     schemaPreservesCase: Boolean = true,
     ignoredProperties: Map[String, String] = Map.empty,
-    viewOriginalText: Option[String] = None,
-    streamingSourceIdentifyingName: Option[StreamingSourceIdentifyingName] = 
None)
+    viewOriginalText: Option[String] = None)
   extends MetadataMapSupport {
 
   import CatalogTable._
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 35588df11bfc..a48c71ad0d36 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
 import org.apache.spark.sql.classic.ClassicConversions.castToImpl
 import org.apache.spark.sql.classic.Dataset
@@ -101,7 +102,9 @@ case class DataSource(
     partitionColumns: Seq[String] = Seq.empty,
     bucketSpec: Option[BucketSpec] = None,
     options: Map[String, String] = Map.empty,
-    catalogTable: Option[CatalogTable] = None) extends SessionStateHelper with 
Logging {
+    catalogTable: Option[CatalogTable] = None,
+    userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] = 
None)
+  extends SessionStateHelper with Logging {
 
   case class SourceInfo(name: String, schema: StructType, partitionColumns: 
Seq[String])
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 436a760c5b6f..220b0b8a6301 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -298,16 +298,14 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
     extraOptions: CaseInsensitiveStringMap,
     sourceIdentifyingName: StreamingSourceIdentifyingName
   ): StreamingRelation = {
-    // Set the source identifying name on the CatalogTable so it propagates to 
StreamingRelation
-    val tableWithSourceName = table.copy(
-      streamingSourceIdentifyingName = Some(sourceIdentifyingName))
-    val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, 
tableWithSourceName)
+    val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, 
table)
     val dataSource = DataSource(
       SparkSession.active,
-      className = tableWithSourceName.provider.get,
-      userSpecifiedSchema = Some(tableWithSourceName.schema),
+      className = table.provider.get,
+      userSpecifiedSchema = Some(table.schema),
       options = dsOptions,
-      catalogTable = Some(tableWithSourceName))
+      catalogTable = Some(table),
+      userSpecifiedStreamingSourceName = Some(sourceIdentifyingName))
     StreamingRelation(dataSource)
   }
 
@@ -344,29 +342,23 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
     // to preserve the source identifying name. With resolveOperators 
(bottom-up), the child
     // is processed first but doesn't match the case above due to the 
!isStreaming guard,
     // so the NamedStreamingRelation case here can match.
-    // We set the sourceIdentifyingName on the CatalogTable so it propagates 
to StreamingRelation.
+    // We pass the sourceIdentifyingName directly to getStreamingRelation.
     case NamedStreamingRelation(u: UnresolvedCatalogRelation, 
sourceIdentifyingName) =>
-      val tableWithSourceName = u.tableMeta.copy(
-        streamingSourceIdentifyingName = Some(sourceIdentifyingName))
-      resolveUnresolvedCatalogRelation(u.copy(tableMeta = tableWithSourceName))
+      getStreamingRelation(u.tableMeta, u.options, sourceIdentifyingName)
 
     // Handle NamedStreamingRelation wrapping 
SubqueryAlias(UnresolvedCatalogRelation)
     // - this happens when resolving streaming tables from catalogs where the 
table lookup
     // creates a SubqueryAlias wrapper around the UnresolvedCatalogRelation.
     case NamedStreamingRelation(
         SubqueryAlias(alias, u: UnresolvedCatalogRelation), 
sourceIdentifyingName) =>
-      val tableWithSourceName = u.tableMeta.copy(
-        streamingSourceIdentifyingName = Some(sourceIdentifyingName))
-      val resolved = resolveUnresolvedCatalogRelation(u.copy(tableMeta = 
tableWithSourceName))
+      val resolved = getStreamingRelation(u.tableMeta, u.options, 
sourceIdentifyingName)
       SubqueryAlias(alias, resolved)
 
     // Fallback for streaming UnresolvedCatalogRelation that is NOT wrapped in
     // NamedStreamingRelation (e.g., from .readStream.table() API path).
-    // The sourceIdentifyingName defaults to Unassigned via
-    // tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
-    // in resolveUnresolvedCatalogRelation.
+    // The sourceIdentifyingName defaults to Unassigned.
     case u: UnresolvedCatalogRelation if u.isStreaming =>
-      resolveUnresolvedCatalogRelation(u)
+      getStreamingRelation(u.tableMeta, u.options, Unassigned)
 
     case s @ StreamingRelationV2(
         _, _, table, extraOptions, _, _, _,
@@ -392,11 +384,11 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
       case UnresolvedCatalogRelation(tableMeta, _, false) =>
         DDLUtils.readHiveTable(tableMeta)
 
-      // For streaming, the sourceIdentifyingName is read from
-      // tableMeta.streamingSourceIdentifyingName which was set by the caller.
+      // For streaming, the sourceIdentifyingName defaults to Unassigned.
+      // Callers that have a specific sourceIdentifyingName should call
+      // getStreamingRelation directly instead of this method.
       case UnresolvedCatalogRelation(tableMeta, extraOptions, true) =>
-        val sourceIdentifyingName = 
tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
-        getStreamingRelation(tableMeta, extraOptions, sourceIdentifyingName)
+        getStreamingRelation(tableMeta, extraOptions, Unassigned)
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
index 827a2f1d066d..232f3475bb1b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
@@ -35,10 +35,8 @@ import 
org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
-    // Extract source identifying name from CatalogTable for stable checkpoints
-    val sourceIdentifyingName = dataSource.catalogTable
-      .flatMap(_.streamingSourceIdentifyingName)
-      .getOrElse(Unassigned)
+    // Extract source identifying name from DataSource for stable checkpoints
+    val sourceIdentifyingName = 
dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned)
     StreamingRelation(
       dataSource, dataSource.sourceInfo.name, 
toAttributes(dataSource.sourceInfo.schema),
       sourceIdentifyingName)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
index 642aaeb48392..599a48c64208 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
@@ -130,9 +130,6 @@ class StreamRelationSuite extends SharedSparkSession with 
AnalysisTest {
     val catalogTable = spark.sessionState.catalog.getTableMetadata(
       TableIdentifier("t")
     )
-    // During streaming resolution, the CatalogTable gets 
streamingSourceIdentifyingName set
-    val catalogTableWithSourceName = catalogTable.copy(
-      streamingSourceIdentifyingName = Some(Unassigned))
     val idAttr = AttributeReference(name = "id", dataType = IntegerType)()
 
     val expectedAnalyzedPlan = Project(
@@ -148,7 +145,8 @@ class StreamRelationSuite extends SharedSparkSession with 
AnalysisTest {
               "path" -> catalogTable.location.toString
             ),
             userSpecifiedSchema = Option(catalogTable.schema),
-            catalogTable = Option(catalogTableWithSourceName)
+            catalogTable = Option(catalogTable),
+            userSpecifiedStreamingSourceName = Some(Unassigned)
           ),
           sourceName = s"FileSource[${catalogTable.location.toString}]",
           output = Seq(idAttr)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
index 91c36fcb7935..caf6c14c8000 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
@@ -43,11 +43,10 @@ class StreamingSourceIdentifyingNameSuite extends 
SharedSparkSession {
       assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
         s"Expected Unassigned but got 
${streamingRelation.get.sourceIdentifyingName}")
 
-      // Verify the CatalogTable in DataSource has the name set
-      val catalogTable = streamingRelation.get.dataSource.catalogTable
-      assert(catalogTable.isDefined, "Expected CatalogTable in DataSource")
-      assert(catalogTable.get.streamingSourceIdentifyingName == 
Some(Unassigned),
-        s"Expected Some(Unassigned) but got 
${catalogTable.get.streamingSourceIdentifyingName}")
+      // Verify the DataSource has the sourceIdentifyingName set
+      val dsSourceName = 
streamingRelation.get.dataSource.userSpecifiedStreamingSourceName
+      assert(dsSourceName == Some(Unassigned),
+        s"Expected Some(Unassigned) but got $dsSourceName")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to