This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9df8644a59e9 [SPARK-55029][SS] Propagate streaming source identifying
name through resolution pipeline
9df8644a59e9 is described below
commit 9df8644a59e98bf869338d6585c56834b846ef20
Author: ericm-db <[email protected]>
AuthorDate: Wed Jan 14 11:47:42 2026 -0800
[SPARK-55029][SS] Propagate streaming source identifying name through
resolution pipeline
### What changes were proposed in this pull request?
This change enables streaming source identifying name propagation from
CatalogTable through DataSource to StreamingRelation during table resolution.
Key changes:
- Add streamingSourceIdentifyingName field to CatalogTable
- Update NameStreamingSources to only unwrap when child is resolved
- Extend getStreamingRelation to accept and propagate sourceIdentifyingName
- Handle NamedStreamingRelation wrapping
UnresolvedCatalogRelation/SubqueryAlias
- Add sourceIdentifyingName to StreamingRelation and
StreamingExecutionRelation
- Add tests for streaming table resolution with source naming
- Update existing test expectations to include sourceIdentifyingName
### Why are the changes needed?
This infrastructure is needed for stable checkpoint locations and source
evolution.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53792 from ericm-db/streaming-source-naming-apply-oss-diff.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 4 +-
.../catalyst/analysis/NameStreamingSources.scala | 4 +-
.../spark/sql/catalyst/catalog/interface.scala | 5 +-
.../execution/datasources/DataSourceStrategy.scala | 64 ++++++++---
.../streaming/runtime/MicroBatchExecution.scala | 12 ++-
.../streaming/runtime/StreamingRelation.scala | 23 +++-
.../execution/streaming/StreamRelationSuite.scala | 5 +-
.../sql/streaming/FileStreamSourceSuite.scala | 6 +-
.../StreamingSourceIdentifyingNameSuite.scala | 117 +++++++++++++++++++++
9 files changed, 212 insertions(+), 28 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index f4bd782617ae..0bca223d09ae 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -125,7 +125,7 @@ abstract class KafkaSourceTest extends StreamTest with
SharedSparkSession with K
val sources: Seq[SparkDataStream] = {
query.get.logicalPlan.collect {
- case StreamingExecutionRelation(source: KafkaSource, _, _) => source
+ case StreamingExecutionRelation(source: KafkaSource, _, _, _) =>
source
case r: StreamingDataSourceV2ScanRelation
if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
r.stream.isInstanceOf[KafkaContinuousStream] =>
@@ -1615,7 +1615,7 @@ abstract class KafkaMicroBatchV1SourceSuite extends
KafkaMicroBatchSourceSuiteBa
makeSureGetOffsetCalled,
AssertOnQuery { query =>
query.logicalPlan.collectFirst {
- case StreamingExecutionRelation(_: KafkaSource, _, _) => true
+ case StreamingExecutionRelation(_: KafkaSource, _, _, _) => true
}.nonEmpty
}
)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NameStreamingSources.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NameStreamingSources.scala
index 106d74fcac53..d56edf1d5119 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NameStreamingSources.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NameStreamingSources.scala
@@ -127,8 +127,10 @@ object NameStreamingSources extends Rule[LogicalPlan] {
} else {
// Feature disabled - unwrap NamedStreamingRelation nodes without
propagating names.
// Error if any source has an explicitly assigned name since the feature
is disabled.
+ // Only unwrap when child is resolved - this allows FindDataSourceTable
to resolve
+ // UnresolvedCatalogRelation to StreamingRelation before we unwrap.
plan.resolveOperatorsWithPruning(_.containsPattern(NAMED_STREAMING_RELATION)) {
- case NamedStreamingRelation(child, Unassigned) =>
+ case NamedStreamingRelation(child, Unassigned) if child.resolved =>
child
case NamedStreamingRelation(_, UserProvided(name)) =>
throw
QueryCompilationErrors.streamingSourceNamingNotSupportedError(name)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 01153d516e5c..7d15551afe0c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
+import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.CatalogManager
@@ -442,7 +443,9 @@ case class CatalogTable(
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty,
- viewOriginalText: Option[String] = None) extends MetadataMapSupport {
+ viewOriginalText: Option[String] = None,
+ streamingSourceIdentifyingName: Option[StreamingSourceIdentifyingName] =
None)
+ extends MetadataMapSupport {
import CatalogTable._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 93ad3ca3856e..436a760c5b6f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -32,15 +32,16 @@ import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters,
InternalRow, QualifiedTableName, SQLConfHelper}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter,
RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir,
InsertIntoStatement, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir,
InsertIntoStatement, LogicalPlan, Project, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
+import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2,
StreamingSourceIdentifyingName, Unassigned}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn,
PushableExpression, ResolveDefaultColumns}
import org.apache.spark.sql.classic.{SparkSession, Strategy}
@@ -293,15 +294,20 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
}
private def getStreamingRelation(
- table: CatalogTable,
- extraOptions: CaseInsensitiveStringMap): StreamingRelation = {
- val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions,
table)
+ table: CatalogTable,
+ extraOptions: CaseInsensitiveStringMap,
+ sourceIdentifyingName: StreamingSourceIdentifyingName
+ ): StreamingRelation = {
+ // Set the source identifying name on the CatalogTable so it propagates to
StreamingRelation
+ val tableWithSourceName = table.copy(
+ streamingSourceIdentifyingName = Some(sourceIdentifyingName))
+ val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions,
tableWithSourceName)
val dataSource = DataSource(
SparkSession.active,
- className = table.provider.get,
- userSpecifiedSchema = Some(table.schema),
+ className = tableWithSourceName.provider.get,
+ userSpecifiedSchema = Some(tableWithSourceName.schema),
options = dsOptions,
- catalogTable = Some(table))
+ catalogTable = Some(tableWithSourceName))
StreamingRelation(dataSource)
}
@@ -321,7 +327,10 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
table.partitionColumnNames.map(name => name -> None).toMap,
Seq.empty, append.query, false, append.isByName)
- case unresolvedCatalogRelation: UnresolvedCatalogRelation =>
+ // Skip streaming UnresolvedCatalogRelation here - they're handled by the
+ // NamedStreamingRelation case below to preserve the source identifying
name.
+ case unresolvedCatalogRelation: UnresolvedCatalogRelation
+ if !unresolvedCatalogRelation.isStreaming =>
val result = resolveUnresolvedCatalogRelation(unresolvedCatalogRelation)
// We put the resolved relation into the [[AnalyzerBridgeState]] for
// it to be later reused by the single-pass [[Resolver]] to avoid
resolving the
@@ -331,11 +340,39 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
}
result
+ // Handle streaming UnresolvedCatalogRelation wrapped in
NamedStreamingRelation
+ // to preserve the source identifying name. With resolveOperators
(bottom-up), the child
+ // is processed first but doesn't match the case above due to the
!isStreaming guard,
+ // so the NamedStreamingRelation case here can match.
+ // We set the sourceIdentifyingName on the CatalogTable so it propagates
to StreamingRelation.
+ case NamedStreamingRelation(u: UnresolvedCatalogRelation,
sourceIdentifyingName) =>
+ val tableWithSourceName = u.tableMeta.copy(
+ streamingSourceIdentifyingName = Some(sourceIdentifyingName))
+ resolveUnresolvedCatalogRelation(u.copy(tableMeta = tableWithSourceName))
+
+ // Handle NamedStreamingRelation wrapping
SubqueryAlias(UnresolvedCatalogRelation)
+ // - this happens when resolving streaming tables from catalogs where the
table lookup
+ // creates a SubqueryAlias wrapper around the UnresolvedCatalogRelation.
+ case NamedStreamingRelation(
+ SubqueryAlias(alias, u: UnresolvedCatalogRelation),
sourceIdentifyingName) =>
+ val tableWithSourceName = u.tableMeta.copy(
+ streamingSourceIdentifyingName = Some(sourceIdentifyingName))
+ val resolved = resolveUnresolvedCatalogRelation(u.copy(tableMeta =
tableWithSourceName))
+ SubqueryAlias(alias, resolved)
+
+ // Fallback for streaming UnresolvedCatalogRelation that is NOT wrapped in
+ // NamedStreamingRelation (e.g., from .readStream.table() API path).
+ // The sourceIdentifyingName defaults to Unassigned via
+ // tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
+ // in resolveUnresolvedCatalogRelation.
+ case u: UnresolvedCatalogRelation if u.isStreaming =>
+ resolveUnresolvedCatalogRelation(u)
+
case s @ StreamingRelationV2(
_, _, table, extraOptions, _, _, _,
- Some(UnresolvedCatalogRelation(tableMeta, _, true)), _) =>
+ Some(UnresolvedCatalogRelation(tableMeta, _, true)), name) =>
import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
- val v1Relation = getStreamingRelation(tableMeta, extraOptions)
+ val v1Relation = getStreamingRelation(tableMeta, extraOptions, name)
if (table.isInstanceOf[SupportsRead]
&& table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ)) {
s.copy(v1Relation = Some(v1Relation))
@@ -355,8 +392,11 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
case UnresolvedCatalogRelation(tableMeta, _, false) =>
DDLUtils.readHiveTable(tableMeta)
+ // For streaming, the sourceIdentifyingName is read from
+ // tableMeta.streamingSourceIdentifyingName which was set by the caller.
case UnresolvedCatalogRelation(tableMeta, extraOptions, true) =>
- getStreamingRelation(tableMeta, extraOptions)
+ val sourceIdentifyingName =
tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
+ getStreamingRelation(tableMeta, extraOptions, sourceIdentifyingName)
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index d81b2276eab7..e11a99653614 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -185,7 +185,8 @@ class MicroBatchExecution(
import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
- case streamingRelation @ StreamingRelation(dataSourceV1, sourceName,
output) =>
+ case streamingRelation @ StreamingRelation(
+ dataSourceV1, sourceName, output, sourceIdentifyingName) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
@@ -194,11 +195,12 @@ class MicroBatchExecution(
logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}]
" +
log"from DataSourceV1 named
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, sourceName)}' " +
log"[${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION,
dataSourceV1)}]")
- StreamingExecutionRelation(source, output,
dataSourceV1.catalogTable)(sparkSession)
+ StreamingExecutionRelation(
+ source, output, dataSourceV1.catalogTable,
sourceIdentifyingName)(sparkSession)
})
case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options,
output,
- catalog, identifier, v1, _) =>
+ catalog, identifier, v1, sourceIdentifyingName) =>
val dsStr = if (src.nonEmpty) s"[${src.get}]" else ""
val v2Disabled =
disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName)
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
@@ -241,7 +243,7 @@ class MicroBatchExecution(
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
// We don't have a catalog table but may have a table identifier.
Given this is about
// v1 fallback path, we just give up and set the catalog table as
None.
- StreamingExecutionRelation(source, output, None)(sparkSession)
+ StreamingExecutionRelation(source, output, None,
sourceIdentifyingName)(sparkSession)
})
}
}
@@ -969,7 +971,7 @@ class MicroBatchExecution(
// Replace sources in the logical plan with data that has arrived since
the last batch.
val newBatchesPlan = logicalPlan transform {
// For v1 sources.
- case StreamingExecutionRelation(source, output, catalogTable) =>
+ case StreamingExecutionRelation(source, output, catalogTable, _) =>
mutableNewData.get(source).map { dataPlan =>
val hasFileMetadata = output.exists {
case FileSourceMetadataAttribute(_) => true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
index 96f6340cb958..84eb9ff77c96 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
@@ -24,6 +24,7 @@ import
org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns,
LeafNode, LogicalPlan, Statistics}
+import
org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName,
Unassigned}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -34,8 +35,13 @@ import
org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
+ // Extract source identifying name from CatalogTable for stable checkpoints
+ val sourceIdentifyingName = dataSource.catalogTable
+ .flatMap(_.streamingSourceIdentifyingName)
+ .getOrElse(Unassigned)
StreamingRelation(
- dataSource, dataSource.sourceInfo.name,
toAttributes(dataSource.sourceInfo.schema))
+ dataSource, dataSource.sourceInfo.name,
toAttributes(dataSource.sourceInfo.schema),
+ sourceIdentifyingName)
}
}
@@ -46,11 +52,20 @@ object StreamingRelation {
* It should be used to create [[Source]] and converted to
[[StreamingExecutionRelation]] when
* passing to [[StreamExecution]] to run a query.
*/
-case class StreamingRelation(dataSource: DataSource, sourceName: String,
output: Seq[Attribute])
+case class StreamingRelation(
+ dataSource: DataSource,
+ sourceName: String,
+ output: Seq[Attribute],
+ sourceIdentifyingName: StreamingSourceIdentifyingName = Unassigned)
extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns {
override def isStreaming: Boolean = true
override def toString: String = sourceName
+ // Provide a concise representation for plan comparison output
+ override def simpleString(maxFields: Int): String = {
+ s"StreamingRelation $sourceName, ${output.mkString("[", ", ", "]")},
$sourceIdentifyingName"
+ }
+
// There's no sensible value here. On the execution path, this relation will
be
// swapped out with microbatches. But some dataframe operations (in
particular explain) do lead
// to this node surviving analysis. So we satisfy the LeafNode contract with
the session default
@@ -88,7 +103,9 @@ case class StreamingRelation(dataSource: DataSource,
sourceName: String, output:
case class StreamingExecutionRelation(
source: SparkDataStream,
output: Seq[Attribute],
- catalogTable: Option[CatalogTable])(session: SparkSession)
+ catalogTable: Option[CatalogTable],
+ sourceIdentifyingName: StreamingSourceIdentifyingName = Unassigned)
+ (session: SparkSession)
extends LeafNode with MultiInstanceRelation {
override def otherCopyArgs: Seq[AnyRef] = session :: Nil
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
index e13f592f7078..642aaeb48392 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
@@ -130,6 +130,9 @@ class StreamRelationSuite extends SharedSparkSession with
AnalysisTest {
val catalogTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier("t")
)
+ // During streaming resolution, the CatalogTable gets
streamingSourceIdentifyingName set
+ val catalogTableWithSourceName = catalogTable.copy(
+ streamingSourceIdentifyingName = Some(Unassigned))
val idAttr = AttributeReference(name = "id", dataType = IntegerType)()
val expectedAnalyzedPlan = Project(
@@ -145,7 +148,7 @@ class StreamRelationSuite extends SharedSparkSession with
AnalysisTest {
"path" -> catalogTable.location.toString
),
userSpecifiedSchema = Option(catalogTable.schema),
- catalogTable = Option(catalogTable)
+ catalogTable = Option(catalogTableWithSourceName)
),
sourceName = s"FileSource[${catalogTable.location.toString}]",
output = Seq(idAttr)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 15a0f048dd8a..83e6772d69dc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -190,7 +190,7 @@ abstract class FileStreamSourceTest
protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
val checkpointLocation = Utils.createTempDir(namePrefix =
"streaming.metadata").getCanonicalPath
df.queryExecution.analyzed
- .collect { case StreamingRelation(dataSource, _, _) =>
+ .collect { case StreamingRelation(dataSource, _, _, _) =>
// There is only one source in our tests so just set sourceId to 0
dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
}.head
@@ -198,7 +198,7 @@ abstract class FileStreamSourceTest
protected def getSourcesFromStreamingQuery(query: StreamExecution):
Seq[FileStreamSource] = {
query.logicalPlan.collect {
- case StreamingExecutionRelation(source, _, _) if
source.isInstanceOf[FileStreamSource] =>
+ case StreamingExecutionRelation(source, _, _, _) if
source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}
}
@@ -251,7 +251,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
reader.load()
}
df.queryExecution.analyzed
- .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema
}.head
+ .collect { case s @ StreamingRelation(dataSource, _, _, _) => s.schema
}.head
}
override def beforeAll(): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
new file mode 100644
index 000000000000..91c36fcb7935
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.streaming.Unassigned
+import org.apache.spark.sql.execution.streaming.runtime.StreamingRelation
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests for streaming source identifying name propagation through the
resolution pipeline.
+ * These tests verify that sourceIdentifyingName is correctly propagated from
CatalogTable
+ * through DataSource to StreamingRelation during streaming table resolution.
+ */
+class StreamingSourceIdentifyingNameSuite extends SharedSparkSession {
+
+ test("STREAM table resolution propagates sourceIdentifyingName through
pipeline") {
+ withTable("stream_name_test") {
+ sql("CREATE TABLE stream_name_test (id INT) USING PARQUET")
+
+ val analyzedPlan = sql("SELECT * FROM STREAM
stream_name_test").queryExecution.analyzed
+
+ val streamingRelation = analyzedPlan.collectFirst {
+ case sr: StreamingRelation => sr
+ }
+
+ assert(streamingRelation.isDefined, "Expected StreamingRelation in
analyzed plan")
+ assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
+ s"Expected Unassigned but got
${streamingRelation.get.sourceIdentifyingName}")
+
+ // Verify the CatalogTable in DataSource has the name set
+ val catalogTable = streamingRelation.get.dataSource.catalogTable
+ assert(catalogTable.isDefined, "Expected CatalogTable in DataSource")
+ assert(catalogTable.get.streamingSourceIdentifyingName ==
Some(Unassigned),
+ s"Expected Some(Unassigned) but got
${catalogTable.get.streamingSourceIdentifyingName}")
+ }
+ }
+
+ test("STREAM with qualified name propagates sourceIdentifyingName through
SubqueryAlias") {
+ // When querying a table with a qualified name (e.g.,
spark_catalog.default.t),
+ // the catalog lookup creates a SubqueryAlias wrapper.
+ withTable("stream_alias_test") {
+ sql("CREATE TABLE stream_alias_test (id INT, data STRING) USING PARQUET")
+
+ // Query using fully qualified name to ensure SubqueryAlias is created
+ val analyzedPlan = sql(
+ "SELECT * FROM STREAM spark_catalog.default.stream_alias_test"
+ ).queryExecution.analyzed
+
+ val streamingRelation = analyzedPlan.collectFirst {
+ case sr: StreamingRelation => sr
+ }
+
+ assert(streamingRelation.isDefined, "Expected StreamingRelation in
analyzed plan")
+ assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
+ s"Expected Unassigned but got
${streamingRelation.get.sourceIdentifyingName}")
+
+ // Verify the plan has SubqueryAlias wrapping StreamingRelation
+ val hasSubqueryAlias = analyzedPlan.collect {
+ case SubqueryAlias(_, _: StreamingRelation) => true
+ }.nonEmpty
+ assert(hasSubqueryAlias, "Expected SubqueryAlias wrapping
StreamingRelation")
+ }
+ }
+
+ test("readStream.table() API resolves streaming table correctly") {
+ // This tests the fallback case in FindDataSourceTable for streaming
+ // UnresolvedCatalogRelation that is NOT wrapped in NamedStreamingRelation.
+ withTable("api_stream_test") {
+ sql("CREATE TABLE api_stream_test (id INT) USING PARQUET")
+
+ val df = spark.readStream.table("api_stream_test")
+ val analyzedPlan = df.queryExecution.analyzed
+
+ val streamingRelation = analyzedPlan.collectFirst {
+ case sr: StreamingRelation => sr
+ }
+
+ assert(streamingRelation.isDefined, "Expected StreamingRelation in
analyzed plan")
+ assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
+ s"Expected Unassigned but got
${streamingRelation.get.sourceIdentifyingName}")
+ }
+ }
+
+ test("batch table query still resolves correctly (regression test)") {
+ // Verify that the !isStreaming guard in FindDataSourceTable
+ // doesn't break normal batch table resolution.
+ withTable("batch_test") {
+ sql("CREATE TABLE batch_test (id INT, value STRING) USING PARQUET")
+ sql("INSERT INTO batch_test VALUES (1, 'a'), (2, 'b')")
+
+ val result = sql("SELECT * FROM batch_test").collect()
+ assert(result.length == 2, s"Expected 2 rows but got ${result.length}")
+
+ val analyzedPlan = sql("SELECT * FROM
batch_test").queryExecution.analyzed
+ val hasStreamingRelation = analyzedPlan.collect {
+ case _: StreamingRelation => true
+ }.nonEmpty
+ assert(!hasStreamingRelation, "Batch query should not contain
StreamingRelation")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]