This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 605305b418e1 [SPARK-55402][SS] Move streamingSourceIdentifyingName
from CatalogTable to DataSource
605305b418e1 is described below
commit 605305b418e14feb86cbb782399a0c711132fd40
Author: ericm-db <[email protected]>
AuthorDate: Mon Feb 9 11:16:41 2026 -0800
[SPARK-55402][SS] Move streamingSourceIdentifyingName from CatalogTable to
DataSource
### What changes were proposed in this pull request?
This PR refactors the streamingSourceIdentifyingName field to move it from
CatalogTable into DataSource as a constructor parameter. The changes include:
1. DataSource.scala: Added streamingSourceIdentifyingName as an optional
constructor parameter
2. StreamingRelation.scala: Updated StreamingRelation.apply() to read the
name from DataSource instead of CatalogTable
3. DataSourceStrategy.scala: Modified
FindDataSourceTable.getStreamingRelation() to pass the name directly to
DataSource constructor instead of copying it onto CatalogTable
4. CatalogTable (interface.scala): Removed the
streamingSourceIdentifyingName field entirely
5. Test updates: Updated test files to reflect the new architecture
### Why are the changes needed?
streamingSourceIdentifyingName represents query-specific metadata (which
source name was assigned in a particular streaming query plan), not an
intrinsic property of the table itself
By moving this field to DataSource (which is already query-specific), we
restore proper catalog table equality while maintaining the ability to track
streaming source identifying names for stable checkpoints.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6
Closes #54185 from ericm-db/source-name-ds.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../spark/sql/catalyst/catalog/interface.scala | 4 +--
.../sql/execution/datasources/DataSource.scala | 5 ++-
.../execution/datasources/DataSourceStrategy.scala | 36 +++++++++-------------
.../streaming/runtime/StreamingRelation.scala | 6 ++--
.../execution/streaming/StreamRelationSuite.scala | 6 ++--
.../StreamingSourceIdentifyingNameSuite.scala | 9 +++---
6 files changed, 27 insertions(+), 39 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 7d15551afe0c..fcaea4709504 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -41,7 +41,6 @@ import
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.CatalogManager
@@ -443,8 +442,7 @@ case class CatalogTable(
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty,
- viewOriginalText: Option[String] = None,
- streamingSourceIdentifyingName: Option[StreamingSourceIdentifyingName] =
None)
+ viewOriginalText: Option[String] = None)
extends MetadataMapSupport {
import CatalogTable._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 35588df11bfc..a48c71ad0d36 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec,
CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.classic.Dataset
@@ -101,7 +102,9 @@ case class DataSource(
partitionColumns: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty,
- catalogTable: Option[CatalogTable] = None) extends SessionStateHelper with
Logging {
+ catalogTable: Option[CatalogTable] = None,
+ userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] =
None)
+ extends SessionStateHelper with Logging {
case class SourceInfo(name: String, schema: StructType, partitionColumns:
Seq[String])
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 436a760c5b6f..220b0b8a6301 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -298,16 +298,14 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
extraOptions: CaseInsensitiveStringMap,
sourceIdentifyingName: StreamingSourceIdentifyingName
): StreamingRelation = {
- // Set the source identifying name on the CatalogTable so it propagates to
StreamingRelation
- val tableWithSourceName = table.copy(
- streamingSourceIdentifyingName = Some(sourceIdentifyingName))
- val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions,
tableWithSourceName)
+ val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions,
table)
val dataSource = DataSource(
SparkSession.active,
- className = tableWithSourceName.provider.get,
- userSpecifiedSchema = Some(tableWithSourceName.schema),
+ className = table.provider.get,
+ userSpecifiedSchema = Some(table.schema),
options = dsOptions,
- catalogTable = Some(tableWithSourceName))
+ catalogTable = Some(table),
+ userSpecifiedStreamingSourceName = Some(sourceIdentifyingName))
StreamingRelation(dataSource)
}
@@ -344,29 +342,23 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
// to preserve the source identifying name. With resolveOperators
(bottom-up), the child
// is processed first but doesn't match the case above due to the
!isStreaming guard,
// so the NamedStreamingRelation case here can match.
- // We set the sourceIdentifyingName on the CatalogTable so it propagates
to StreamingRelation.
+ // We pass the sourceIdentifyingName directly to getStreamingRelation.
case NamedStreamingRelation(u: UnresolvedCatalogRelation,
sourceIdentifyingName) =>
- val tableWithSourceName = u.tableMeta.copy(
- streamingSourceIdentifyingName = Some(sourceIdentifyingName))
- resolveUnresolvedCatalogRelation(u.copy(tableMeta = tableWithSourceName))
+ getStreamingRelation(u.tableMeta, u.options, sourceIdentifyingName)
// Handle NamedStreamingRelation wrapping
SubqueryAlias(UnresolvedCatalogRelation)
// - this happens when resolving streaming tables from catalogs where the
table lookup
// creates a SubqueryAlias wrapper around the UnresolvedCatalogRelation.
case NamedStreamingRelation(
SubqueryAlias(alias, u: UnresolvedCatalogRelation),
sourceIdentifyingName) =>
- val tableWithSourceName = u.tableMeta.copy(
- streamingSourceIdentifyingName = Some(sourceIdentifyingName))
- val resolved = resolveUnresolvedCatalogRelation(u.copy(tableMeta =
tableWithSourceName))
+ val resolved = getStreamingRelation(u.tableMeta, u.options,
sourceIdentifyingName)
SubqueryAlias(alias, resolved)
// Fallback for streaming UnresolvedCatalogRelation that is NOT wrapped in
// NamedStreamingRelation (e.g., from .readStream.table() API path).
- // The sourceIdentifyingName defaults to Unassigned via
- // tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
- // in resolveUnresolvedCatalogRelation.
+ // The sourceIdentifyingName defaults to Unassigned.
case u: UnresolvedCatalogRelation if u.isStreaming =>
- resolveUnresolvedCatalogRelation(u)
+ getStreamingRelation(u.tableMeta, u.options, Unassigned)
case s @ StreamingRelationV2(
_, _, table, extraOptions, _, _, _,
@@ -392,11 +384,11 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
case UnresolvedCatalogRelation(tableMeta, _, false) =>
DDLUtils.readHiveTable(tableMeta)
- // For streaming, the sourceIdentifyingName is read from
- // tableMeta.streamingSourceIdentifyingName which was set by the caller.
+ // For streaming, the sourceIdentifyingName defaults to Unassigned.
+ // Callers that have a specific sourceIdentifyingName should call
+ // getStreamingRelation directly instead of this method.
case UnresolvedCatalogRelation(tableMeta, extraOptions, true) =>
- val sourceIdentifyingName =
tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
- getStreamingRelation(tableMeta, extraOptions, sourceIdentifyingName)
+ getStreamingRelation(tableMeta, extraOptions, Unassigned)
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
index 827a2f1d066d..232f3475bb1b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
@@ -35,10 +35,8 @@ import
org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
- // Extract source identifying name from CatalogTable for stable checkpoints
- val sourceIdentifyingName = dataSource.catalogTable
- .flatMap(_.streamingSourceIdentifyingName)
- .getOrElse(Unassigned)
+ // Extract source identifying name from DataSource for stable checkpoints
+ val sourceIdentifyingName =
dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned)
StreamingRelation(
dataSource, dataSource.sourceInfo.name,
toAttributes(dataSource.sourceInfo.schema),
sourceIdentifyingName)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
index 642aaeb48392..599a48c64208 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
@@ -130,9 +130,6 @@ class StreamRelationSuite extends SharedSparkSession with
AnalysisTest {
val catalogTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier("t")
)
- // During streaming resolution, the CatalogTable gets
streamingSourceIdentifyingName set
- val catalogTableWithSourceName = catalogTable.copy(
- streamingSourceIdentifyingName = Some(Unassigned))
val idAttr = AttributeReference(name = "id", dataType = IntegerType)()
val expectedAnalyzedPlan = Project(
@@ -148,7 +145,8 @@ class StreamRelationSuite extends SharedSparkSession with
AnalysisTest {
"path" -> catalogTable.location.toString
),
userSpecifiedSchema = Option(catalogTable.schema),
- catalogTable = Option(catalogTableWithSourceName)
+ catalogTable = Option(catalogTable),
+ userSpecifiedStreamingSourceName = Some(Unassigned)
),
sourceName = s"FileSource[${catalogTable.location.toString}]",
output = Seq(idAttr)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
index 91c36fcb7935..caf6c14c8000 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
@@ -43,11 +43,10 @@ class StreamingSourceIdentifyingNameSuite extends
SharedSparkSession {
assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
s"Expected Unassigned but got
${streamingRelation.get.sourceIdentifyingName}")
- // Verify the CatalogTable in DataSource has the name set
- val catalogTable = streamingRelation.get.dataSource.catalogTable
- assert(catalogTable.isDefined, "Expected CatalogTable in DataSource")
- assert(catalogTable.get.streamingSourceIdentifyingName ==
Some(Unassigned),
- s"Expected Some(Unassigned) but got
${catalogTable.get.streamingSourceIdentifyingName}")
+ // Verify the DataSource has the sourceIdentifyingName set
+ val dsSourceName =
streamingRelation.get.dataSource.userSpecifiedStreamingSourceName
+ assert(dsSourceName == Some(Unassigned),
+ s"Expected Some(Unassigned) but got $dsSourceName")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]