This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0a02d71adbe7 [SPARK-55601][SS] Hook StreamingSourceIdentifyingName
into MicrobatchExecution for source naming
0a02d71adbe7 is described below
commit 0a02d71adbe71a8af3c196249c7a486975a909b2
Author: ericm-db <[email protected]>
AuthorDate: Wed Feb 25 09:36:40 2026 -0800
[SPARK-55601][SS] Hook StreamingSourceIdentifyingName into
MicrobatchExecution for source naming
### What changes were proposed in this pull request?
Move streamingSourceIdentifyingName from being passed explicitly through
StreamingRelation constructors to being derived from DataSource itself via a
lazy val, removing the overloaded StreamingRelation.apply and eliminating
scattered Unassigned defaults at call sites.
Pull the names from StreamingRelation and StreamingRelationV2 in
MicroBatchExecution to key the OffsetMap by the source names during query
execution
### Why are the changes needed?
This is the last and final piece to allow for source naming
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54373 from ericm-db/streaming-source-identifying-name.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../streaming/StreamingSourceIdentifyingName.scala | 27 ++
.../sql/catalyst/analysis/ResolveDataSource.scala | 17 +-
.../sql/execution/datasources/DataSource.scala | 7 +-
.../execution/datasources/DataSourceStrategy.scala | 2 +-
.../streaming/checkpointing/OffsetSeq.scala | 6 +-
.../streaming/runtime/MicroBatchExecution.scala | 67 ++-
.../streaming/runtime/StreamingRelation.scala | 11 +-
.../execution/streaming/utils/StreamingUtils.scala | 46 ++
.../execution/streaming/StreamRelationSuite.scala | 2 +-
.../StreamingSourceIdentifyingNameSuite.scala | 6 +-
.../test/StreamingQueryEvolutionSuite.scala | 176 -------
.../test/StreamingSourceEvolutionSuite.scala | 520 +++++++++++++++++++++
12 files changed, 667 insertions(+), 220 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
index b89aaf017b1d..8ac72e817811 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
@@ -47,6 +47,33 @@ sealed trait StreamingSourceIdentifyingName {
case FlowAssigned(name) => s"""name="$name""""
case Unassigned => "name=<Unassigned>"
}
+
+ /**
+ * Extracts only user-provided names, filtering out flow-assigned or
unassigned sources.
+ * Used when narrowing to explicitly user-specified names (e.g., when
passing to DataSource).
+ *
+ * @return Option value set to the user-provided name if available, None
otherwise
+ */
+ def toUserProvided: Option[UserProvided] = this match {
+ case up: UserProvided => Some(up)
+ case _ => None
+ }
+
+ /**
+ * Extracts the name string from named sources (UserProvided or
FlowAssigned).
+ * Returns None for Unassigned sources.
+ *
+ * Useful for pattern matching when both UserProvided and FlowAssigned
should be
+ * treated identically (e.g., when computing metadata paths or building
sourceIdMap).
+ *
+ * @return Option value set to the source name (user-provided or
flow-assigned) if available,
+ * None otherwise
+ */
+ def nameOpt: Option[String] = this match {
+ case UserProvided(name) => Some(name)
+ case FlowAssigned(name) => Some(name)
+ case Unassigned => None
+ }
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala
index 35ff39ce8268..2f139393ade3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala
@@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
UnresolvedDataSource}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2,
Unassigned}
+import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.classic.SparkSession
@@ -84,10 +84,11 @@ class ResolveDataSource(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
- options = optionsWithPath.originalMap)
+ options = optionsWithPath.originalMap,
+ userSpecifiedStreamingSourceName =
sourceIdentifyingName.toUserProvided)
val v1Relation = ds match {
case _: StreamSourceProvider =>
- Some(StreamingRelation(v1DataSource, sourceIdentifyingName))
+ Some(StreamingRelation(v1DataSource))
case _ => None
}
ds match {
@@ -112,16 +113,16 @@ class ResolveDataSource(sparkSession: SparkSession)
extends Rule[LogicalPlan] {
StreamingRelationV2(
Some(provider), source, table, dsOptions,
toAttributes(table.columns.asSchema), None, None, v1Relation,
- sourceIdentifyingName)
+ v1DataSource.streamingSourceIdentifyingName)
// fallback to v1
// TODO (SPARK-27483): we should move this fallback logic to an
analyzer rule.
- case _ => StreamingRelation(v1DataSource, sourceIdentifyingName)
+ case _ => StreamingRelation(v1DataSource)
}
case _ =>
// Code path for data source v1.
- StreamingRelation(v1DataSource, sourceIdentifyingName)
+ StreamingRelation(v1DataSource)
}
case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, true,
paths) =>
@@ -148,7 +149,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
options = optionsWithPath.originalMap)
val v1Relation = ds match {
case _: StreamSourceProvider =>
- Some(StreamingRelation(v1DataSource, Unassigned))
+ Some(StreamingRelation(v1DataSource))
case _ => None
}
ds match {
@@ -173,7 +174,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
StreamingRelationV2(
Some(provider), source, table, dsOptions,
toAttributes(table.columns.asSchema), None, None, v1Relation,
- Unassigned)
+ v1DataSource.streamingSourceIdentifyingName)
// fallback to v1
// TODO (SPARK-27483): we should move this fallback logic to an
analyzer rule.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0c234e3fcf18..d2ec3f7ff486 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec,
CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
+import
org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName,
Unassigned, UserProvided}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.classic.Dataset
@@ -103,13 +103,16 @@ case class DataSource(
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty,
catalogTable: Option[CatalogTable] = None,
- userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] =
None)
+ userSpecifiedStreamingSourceName: Option[UserProvided] = None)
extends SessionStateHelper with Logging {
case class SourceInfo(name: String, schema: StructType, partitionColumns:
Seq[String])
private val conf: SQLConf = getSqlConf(sparkSession)
+ lazy val streamingSourceIdentifyingName: StreamingSourceIdentifyingName =
+ userSpecifiedStreamingSourceName.getOrElse(Unassigned)
+
lazy val providingClass: Class[_] = {
val cls = DataSource.lookupDataSource(className, conf)
// `providingClass` is used for resolving data source relation for catalog
tables.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 220b0b8a6301..63782c753696 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -305,7 +305,7 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
userSpecifiedSchema = Some(table.schema),
options = dsOptions,
catalogTable = Some(table),
- userSpecifiedStreamingSourceName = Some(sourceIdentifyingName))
+ userSpecifiedStreamingSourceName = sourceIdentifyingName.toUserProvided)
StreamingRelation(dataSource)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
index dff09c736a83..bf2278b81492 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
@@ -203,7 +203,8 @@ object OffsetSeqMetadata extends Logging {
STATE_STORE_ROCKSDB_FORMAT_VERSION,
STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION,
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN,
STREAMING_STATE_STORE_ENCODING_FORMAT,
- STATE_STORE_ROW_CHECKSUM_ENABLED, PROTOBUF_EXTENSIONS_SUPPORT_ENABLED
+ STATE_STORE_ROW_CHECKSUM_ENABLED, PROTOBUF_EXTENSIONS_SUPPORT_ENABLED,
+ ENABLE_STREAMING_SOURCE_EVOLUTION
)
/**
@@ -252,7 +253,8 @@ object OffsetSeqMetadata extends Logging {
STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow",
STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false",
STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1",
- PROTOBUF_EXTENSIONS_SUPPORT_ENABLED.key -> "false"
+ PROTOBUF_EXTENSIONS_SUPPORT_ENABLED.key -> "false",
+ ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false"
)
def readValue[T](metadataLog: OffsetSeqMetadataBase, confKey:
ConfigEntry[T]): String = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index a4f50993f64b..35a658d350ab 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.streaming.runtime
+import java.util.concurrent.atomic.AtomicLong
+
import scala.collection.mutable.{Map => MutableMap}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -30,7 +32,7 @@ import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp,
FileSourceMetadataAttribute, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate,
DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState,
FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation,
LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState,
TransformWithStateInPySpark}
-import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2,
WriteToStream}
+import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2,
Unassigned, WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.{Dataset, SparkSession}
@@ -48,6 +50,7 @@ import
org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetH
import
org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
DIR_NAME_OFFSETS, DIR_NAME_STATE}
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink,
WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
import
org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionUtils,
StateSchemaBroadcast, StateStoreErrors}
+import org.apache.spark.sql.execution.streaming.utils.StreamingUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.PartitionOffsetWithIndex
import org.apache.spark.sql.streaming.Trigger
@@ -168,7 +171,7 @@ class MicroBatchExecution(
assert(queryExecutionThread eq Thread.currentThread,
"logicalPlan must be initialized in QueryExecutionThread " +
s"but the current thread was ${Thread.currentThread}")
- var nextSourceId = 0L
+ val nextSourceId = new AtomicLong(0L)
val toExecutionRelationMap = MutableMap[StreamingRelation,
StreamingExecutionRelation]()
val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2,
StreamingExecutionRelation]()
val v2ToRelationMap = MutableMap[StreamingRelationV2,
StreamingDataSourceV2ScanRelation]()
@@ -183,15 +186,24 @@ class MicroBatchExecution(
val disabledSources =
Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders)
+ // Read the source evolution enforcement from the last written offset log
entry. If no entries
+ // are found, use the session config value.
+ val enforceNamed = offsetLog.getLatest().flatMap { case (_, offsetSeq) =>
+ offsetSeq.metadataOpt.flatMap { metadata =>
+ OffsetSeqMetadata.readValueOpt(metadata,
SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
+ .map(_.toBoolean)
+ }
+
}.getOrElse(sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution)
+
import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
case streamingRelation @ StreamingRelation(
dataSourceV1, sourceName, output, sourceIdentifyingName) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
- val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ val metadataPath = StreamingUtils.getMetadataPath(
+ sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
val source = dataSourceV1.createSource(metadataPath)
- nextSourceId += 1
logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}]
" +
log"from DataSourceV1 named
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, sourceName)}' " +
log"[${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION,
dataSourceV1)}]")
@@ -206,8 +218,8 @@ class MicroBatchExecution(
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
v2ToRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
- val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- nextSourceId += 1
+ val metadataPath = StreamingUtils.getMetadataPath(
+ sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
logInfo(log"Reading table [${MDC(LogKeys.STREAMING_TABLE, table)}]
" +
log"from DataSourceV2 named
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
@@ -224,7 +236,8 @@ class MicroBatchExecution(
trigger match {
case RealTimeTrigger(duration) => Some(duration)
case _ => None
- }
+ },
+ sourceIdentifyingName
)
StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
})
@@ -234,10 +247,10 @@ class MicroBatchExecution(
} else {
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
- val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ val metadataPath = StreamingUtils.getMetadataPath(
+ sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
val source =
v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
- nextSourceId += 1
logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE,
source)}] from " +
log"DataSourceV2 named
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
@@ -247,17 +260,37 @@ class MicroBatchExecution(
})
}
}
- sources = _logicalPlan.collect {
+
+ // Extract sources and their sourceIdentifyingName for sourceIdMap mapping
+ val sourcesWithNames = _logicalPlan.collect {
// v1 source
- case s: StreamingExecutionRelation => s.source
+ case s: StreamingExecutionRelation => (s.source, s.sourceIdentifyingName)
// v2 source
- case r: StreamingDataSourceV2ScanRelation => r.stream
+ case r: StreamingDataSourceV2ScanRelation => (r.stream,
r.relation.sourceIdentifyingName)
+ }
+ sources = sourcesWithNames.map(_._1)
+
+ if (enforceNamed) {
+ // When enforcement is enabled, all sources should be named after
validation in analysis.
+ // This assertion ensures that the validation in NameStreamingSources
worked correctly.
+ assert(sourcesWithNames.forall(s => s._2 != Unassigned),
+ "All sources should be named at this point - validation should have
happened in analysis")
+
+ // Create source ID mapping using names (for OffsetMap format)
+ sourceIdMap = sourcesWithNames.map { case (source,
sourceIdentifyingName) =>
+ sourceIdentifyingName.nameOpt match {
+ case Some(name) => name -> source
+ case None =>
+ throw new IllegalStateException(
+ "Unassigned sources should not exist when enforcement is
enabled")
+ }
+ }.toMap
+ } else {
+ // When enforcement is disabled, use positional indices (backward
compatibility)
+ sourceIdMap = sources.zipWithIndex.map {
+ case (source, index) => index.toString -> source
+ }.toMap
}
-
- // Create source ID mapping for OffsetMap support
- sourceIdMap = sources.zipWithIndex.map {
- case (source, index) => index.toString -> source
- }.toMap
// Inform the source if it is in real time mode
if (trigger.isInstanceOf[RealTimeTrigger]) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
index 232f3475bb1b..efa65bc5ead4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
@@ -36,18 +36,9 @@ import
org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
// Extract source identifying name from DataSource for stable checkpoints
- val sourceIdentifyingName =
dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned)
StreamingRelation(
dataSource, dataSource.sourceInfo.name,
toAttributes(dataSource.sourceInfo.schema),
- sourceIdentifyingName)
- }
-
- def apply(
- dataSource: DataSource,
- sourceIdentifyingName: StreamingSourceIdentifyingName):
StreamingRelation = {
- StreamingRelation(
- dataSource, dataSource.sourceInfo.name,
toAttributes(dataSource.sourceInfo.schema),
- sourceIdentifyingName)
+ dataSource.streamingSourceIdentifyingName)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala
index d2654ac943b2..8dfc1fd23b57 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala
@@ -16,13 +16,59 @@
*/
package org.apache.spark.sql.execution.streaming.utils
+import java.util.concurrent.atomic.AtomicLong
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
+
object StreamingUtils {
+ /**
+ * Resolves a checkpoint location to a fully qualified path.
+ *
+ * Converts relative or unqualified paths to fully qualified paths using the
+ * file system's URI and working directory.
+ *
+ * @param hadoopConf Hadoop configuration to access the file system
+ * @param checkpointLocation The checkpoint location path (may be relative
or unqualified)
+ * @return Fully qualified checkpoint location URI as a string
+ */
def resolvedCheckpointLocation(hadoopConf: Configuration,
checkpointLocation: String): String = {
val checkpointPath = new Path(checkpointLocation)
val fs = checkpointPath.getFileSystem(hadoopConf)
checkpointPath.makeQualified(fs.getUri,
fs.getWorkingDirectory).toUri.toString
}
+
+ /**
+ * Computes the metadata path for a streaming source based on its
identifying name.
+ *
+ * Named sources (UserProvided/FlowAssigned) use stable name-based paths,
enabling
+ * source evolution (reordering, adding, or removing sources without
breaking state).
+ * Unassigned sources use sequential IDs for backward compatibility.
+ *
+ * Examples:
+ * - UserProvided("mySource") => "$checkpointRoot/sources/mySource"
+ * - FlowAssigned("source_1") => "$checkpointRoot/sources/source_1"
+ * - Unassigned => "$checkpointRoot/sources/0" (increments nextSourceId)
+ *
+ * @param sourceIdentifyingName The source's identifying name
+ * (UserProvided, FlowAssigned, or Unassigned)
+ * @param nextSourceId AtomicLong tracking the next positional source ID for
Unassigned sources
+ * @param resolvedCheckpointRoot The resolved checkpoint root path
+ * @return The computed metadata path string
+ */
+ def getMetadataPath(
+ sourceIdentifyingName: StreamingSourceIdentifyingName,
+ nextSourceId: AtomicLong,
+ resolvedCheckpointRoot: String): String = {
+ sourceIdentifyingName.nameOpt match {
+ case Some(name) =>
+ // User-provided and flow-assigned names use named paths
+ s"$resolvedCheckpointRoot/sources/$name"
+ case None =>
+ // Unassigned sources get sequential IDs assigned here
+ s"$resolvedCheckpointRoot/sources/${nextSourceId.getAndIncrement()}"
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
index 599a48c64208..dc8da2d5a628 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
@@ -146,7 +146,7 @@ class StreamRelationSuite extends SharedSparkSession with
AnalysisTest {
),
userSpecifiedSchema = Option(catalogTable.schema),
catalogTable = Option(catalogTable),
- userSpecifiedStreamingSourceName = Some(Unassigned)
+ userSpecifiedStreamingSourceName = None
),
sourceName = s"FileSource[${catalogTable.location.toString}]",
output = Seq(idAttr)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
index caf6c14c8000..c89895035c91 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
@@ -43,10 +43,10 @@ class StreamingSourceIdentifyingNameSuite extends
SharedSparkSession {
assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
s"Expected Unassigned but got
${streamingRelation.get.sourceIdentifyingName}")
- // Verify the DataSource has the sourceIdentifyingName set
+ // Verify the DataSource has no user-specified source name (None means
no user-provided name)
val dsSourceName =
streamingRelation.get.dataSource.userSpecifiedStreamingSourceName
- assert(dsSourceName == Some(Unassigned),
- s"Expected Some(Unassigned) but got $dsSourceName")
+ assert(dsSourceName.isEmpty,
+ s"Expected None but got $dsSourceName")
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala
deleted file mode 100644
index 973e2f75439e..000000000000
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming.test
-
-import org.scalatest.Tag
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.StreamTest
-
-/**
- * Test suite for streaming source naming and validation.
- * Tests cover the naming API, validation rules, and resolution pipeline.
- */
-class StreamingQueryEvolutionSuite extends StreamTest {
-
- // ====================
- // Name Validation Tests
- // ====================
-
- testWithSourceEvolution("invalid source name - contains hyphen") {
- checkError(
- exception = intercept[AnalysisException] {
- spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("my-source")
- .load()
- },
- condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
- parameters = Map("sourceName" -> "my-source"))
- }
-
- testWithSourceEvolution("invalid source name - contains space") {
- checkError(
- exception = intercept[AnalysisException] {
- spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("my source")
- .load()
- },
- condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
- parameters = Map("sourceName" -> "my source"))
- }
-
- testWithSourceEvolution("invalid source name - contains dot") {
- checkError(
- exception = intercept[AnalysisException] {
- spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("my.source")
- .load()
- },
- condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
- parameters = Map("sourceName" -> "my.source"))
- }
-
- testWithSourceEvolution("invalid source name - contains special characters")
{
- checkError(
- exception = intercept[AnalysisException] {
- spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("my.source@123")
- .load()
- },
- condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
- parameters = Map("sourceName" -> "my.source@123"))
- }
-
- testWithSourceEvolution("valid source names - various patterns") {
- // Test that valid names work correctly
- Seq("mySource", "my_source", "MySource123", "_private", "source_123_test",
"123source")
- .foreach { name =>
- val df = spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name(name)
- .load()
- assert(df.isStreaming, s"DataFrame should be streaming for name:
$name")
- }
- }
-
- testWithSourceEvolution("method chaining - name() returns reader for
chaining") {
- val df = spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("my_source")
- .option("opt1", "value1")
- .load()
-
- assert(df.isStreaming, "DataFrame should be streaming")
- }
-
- // ==========================
- // Duplicate Detection Tests
- // ==========================
-
- testWithSourceEvolution("duplicate source names - rejected when starting
stream") {
- withTempDir { checkpointDir =>
- val df1 = spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("duplicate_name")
- .load()
-
- val df2 = spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("duplicate_name") // Same name - should fail
- .load()
-
- checkError(
- exception = intercept[AnalysisException] {
- df1.union(df2).writeStream
- .format("org.apache.spark.sql.streaming.test")
- .option("checkpointLocation", checkpointDir.getCanonicalPath)
- .start()
- },
- condition = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
- parameters = Map("names" -> "'duplicate_name'"))
- }
- }
-
- testWithSourceEvolution("enforcement enabled - unnamed source rejected") {
- checkError(
- exception = intercept[AnalysisException] {
- spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .load() // Unnamed - throws error at load() time
- },
- condition =
"STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT",
- parameters = Map("sourceInfo" -> ".*"),
- matchPVals = true)
- }
-
- testWithSourceEvolution("enforcement enabled - all sources named succeeds") {
- val df1 = spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("alpha")
- .load()
-
- val df2 = spark.readStream
- .format("org.apache.spark.sql.streaming.test")
- .name("beta")
- .load()
-
- // Should not throw - all sources are named
- val union = df1.union(df2)
- assert(union.isStreaming, "Union should be streaming")
- }
-
- // ==============
- // Helper Methods
- // ==============
-
- /**
- * Helper method to run tests with source evolution enabled.
- */
- def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: =>
Any): Unit = {
- test(testName, testTags: _*) {
- withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
- testBody
- }
- }
- }
-}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
new file mode 100644
index 000000000000..ec919f1f1242
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.test
+
+import scala.concurrent.duration._
+
+import org.apache.hadoop.fs.Path
+import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterEach, Tag}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.Trigger._
+import org.apache.spark.util.Utils
+
+/**
+ * Test suite for streaming source naming and validation.
+ * Tests cover the naming API, validation rules, and resolution pipeline.
+ */
+class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach
{
+
+ private def newMetadataDir =
+ Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+ override def afterEach(): Unit = {
+ spark.streams.active.foreach(_.stop())
+ super.afterEach()
+ }
+
+ /**
+ * Helper to verify that a source was created with the expected metadata
path.
+ * @param checkpointLocation the checkpoint location path
+ * @param sourcePath the expected source path (e.g., "source1" or "0")
+ * @param mode mockito verification mode (default: times(1))
+ */
+ private def verifySourcePath(
+ checkpointLocation: Path,
+ sourcePath: String,
+ mode: org.mockito.verification.VerificationMode = times(1)): Unit = {
+ verify(LastOptions.mockStreamSourceProvider, mode).createSource(
+ any(),
+ meq(s"${new Path(makeQualifiedPath(
+ checkpointLocation.toString)).toString}/sources/$sourcePath"),
+ meq(None),
+ meq("org.apache.spark.sql.streaming.test"),
+ meq(Map.empty))
+ }
+
+ // ====================
+ // Name Validation Tests
+ // ====================
+
+ testWithSourceEvolution("invalid source name - contains hyphen") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("my-source")
+ .load()
+ },
+ condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+ parameters = Map("sourceName" -> "my-source"))
+ }
+
+ testWithSourceEvolution("invalid source name - contains space") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("my source")
+ .load()
+ },
+ condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+ parameters = Map("sourceName" -> "my source"))
+ }
+
+ testWithSourceEvolution("invalid source name - contains dot") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("my.source")
+ .load()
+ },
+ condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+ parameters = Map("sourceName" -> "my.source"))
+ }
+
+ testWithSourceEvolution("invalid source name - contains special characters")
{
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("my.source@123")
+ .load()
+ },
+ condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+ parameters = Map("sourceName" -> "my.source@123"))
+ }
+
+ testWithSourceEvolution("valid source names - various patterns") {
+ // Test that valid names work correctly
+ Seq("mySource", "my_source", "MySource123", "_private", "source_123_test",
"123source")
+ .foreach { name =>
+ val df = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name(name)
+ .load()
+ assert(df.isStreaming, s"DataFrame should be streaming for name:
$name")
+ }
+ }
+
+ testWithSourceEvolution("method chaining - name() returns reader for
chaining") {
+ val df = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("my_source")
+ .option("opt1", "value1")
+ .load()
+
+ assert(df.isStreaming, "DataFrame should be streaming")
+ }
+
+ // ==========================
+ // Duplicate Detection Tests
+ // ==========================
+
+ testWithSourceEvolution("duplicate source names - rejected when starting
stream") {
+ withTempDir { checkpointDir =>
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("duplicate_name")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("duplicate_name") // Same name - should fail
+ .load()
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .start()
+ },
+ condition = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
+ parameters = Map("names" -> "'duplicate_name'"))
+ }
+ }
+
+ testWithSourceEvolution("enforcement enabled - unnamed source rejected") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load() // Unnamed - throws error at load() time
+ },
+ condition =
"STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT",
+ parameters = Map("sourceInfo" -> ".*"),
+ matchPVals = true)
+ }
+
+ testWithSourceEvolution("enforcement enabled - all sources named succeeds") {
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("alpha")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("beta")
+ .load()
+
+ // Should not throw - all sources are named
+ val union = df1.union(df2)
+ assert(union.isStreaming, "Union should be streaming")
+ }
+
+ test("without enforcement - naming sources throws error") {
+ withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("mySource")
+ .load()
+ },
+ condition =
"STREAMING_QUERY_EVOLUTION_ERROR.SOURCE_NAMING_NOT_SUPPORTED",
+ parameters = Map("name" -> "mySource"))
+ }
+ }
+
+ // =======================
+ // Metadata Path Tests
+ // =======================
+
+ testWithSourceEvolution("named sources - metadata path uses source name") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q = df1.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ verifySourcePath(checkpointLocation, "source1")
+ verifySourcePath(checkpointLocation, "source2")
+ }
+
+ test("unnamed sources use positional IDs for metadata path") {
+ withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+
+ val q = df1.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ // Without naming, sources get sequential IDs (Unassigned -> 0, 1, ...)
+ verifySourcePath(checkpointLocation, "0")
+ verifySourcePath(checkpointLocation, "1")
+ }
+ }
+
+ // ========================
+ // Source Evolution Tests
+ // ========================
+
+ testWithSourceEvolution("source evolution - reorder sources with named
sources") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ // First query: source1 then source2
+ val df1a = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2a = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q1 = df1a.union(df2a).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q1.processAllAvailable()
+ q1.stop()
+
+ LastOptions.clear()
+
+ // Second query: source2 then source1 (reordered) - should still work
+ val df1b = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2b = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q2 = df2b.union(df1b).writeStream // Note: reversed order
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q2.processAllAvailable()
+ q2.stop()
+
+ // Both sources should still use their named paths
+ verifySourcePath(checkpointLocation, "source1", atLeastOnce())
+ verifySourcePath(checkpointLocation, "source2", atLeastOnce())
+ }
+
+ testWithSourceEvolution("source evolution - add new source with named
sources") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ // First query: only source1
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val q1 = df1.writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q1.processAllAvailable()
+ q1.stop()
+
+ LastOptions.clear()
+
+ // Second query: add source2
+ val df1b = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q2 = df1b.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q2.processAllAvailable()
+ q2.stop()
+
+ // Both sources should have been created
+ verifySourcePath(checkpointLocation, "source1", atLeastOnce())
+ verifySourcePath(checkpointLocation, "source2")
+ }
+
+ testWithSourceEvolution("named sources enforcement uses V2 offset log
format") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q = df1.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap,
OffsetSeqLog}
+ val offsetLog = new OffsetSeqLog(spark,
+ makeQualifiedPath(checkpointLocation.toString).toString + "/offsets")
+ val offsetSeq = offsetLog.get(0)
+ assert(offsetSeq.isDefined, "Offset log should have batch 0")
+ assert(offsetSeq.get.isInstanceOf[OffsetMap],
+ s"Expected OffsetMap but got ${offsetSeq.get.getClass.getSimpleName}")
+ }
+
+ testWithSourceEvolution("names preserved through union operations") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("alpha")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("beta")
+ .load()
+
+ val df3 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("gamma")
+ .load()
+
+ // Complex union: (alpha union beta) union gamma
+ val q = df1.union(df2).union(df3).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ // All three sources should use their named paths
+ verifySourcePath(checkpointLocation, "alpha")
+ verifySourcePath(checkpointLocation, "beta")
+ verifySourcePath(checkpointLocation, "gamma")
+ }
+
+ testWithSourceEvolution("enforcement config is persisted in offset
metadata") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ // Start query with enforcement enabled
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val q1 = df1.writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q1.processAllAvailable()
+ q1.stop()
+
+ // Verify config was persisted in offset metadata
+ import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog
+ val offsetLog = new OffsetSeqLog(spark,
+ makeQualifiedPath(checkpointLocation.toString).toString + "/offsets")
+ val offsetSeq = offsetLog.get(0)
+ assert(offsetSeq.isDefined, "Offset log should have batch 0")
+ assert(offsetSeq.get.metadataOpt.isDefined, "Offset metadata should be
present")
+ assert(offsetSeq.get.metadataOpt.get.conf.contains(
+ SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key),
+ "ENABLE_STREAMING_SOURCE_EVOLUTION should be in offset metadata")
+ assert(offsetSeq.get.metadataOpt.get.conf(
+ SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key) == "true",
+ "ENABLE_STREAMING_SOURCE_EVOLUTION should be true in offset metadata")
+ }
+
+ test("upgrade from old checkpoint without enforcement config uses default
value") {
+ import
org.apache.spark.sql.execution.streaming.checkpointing.{OffsetSeqMetadata,
OffsetSeqMetadataV2}
+
+ // Simulate old checkpoint metadata without
ENABLE_STREAMING_SOURCE_EVOLUTION
+ val oldMetadata = OffsetSeqMetadata(
+ batchWatermarkMs = 0,
+ batchTimestampMs = 0,
+ conf = Map(
+ // Old checkpoint has other configs but not
ENABLE_STREAMING_SOURCE_EVOLUTION
+ SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max"
+ )
+ )
+
+ // Verify that reading the config returns the default value (false)
+ val value = OffsetSeqMetadata.readValueOpt(
+ oldMetadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
+ assert(value.contains("false"),
+ s"Expected default value 'false' for missing config, but got: $value")
+
+ // Also test with V2 metadata
+ val oldMetadataV2 = OffsetSeqMetadataV2(
+ batchWatermarkMs = 0,
+ batchTimestampMs = 0,
+ conf = Map(
+ SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max"
+ )
+ )
+
+ val valueV2 = OffsetSeqMetadata.readValueOpt(
+ oldMetadataV2, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
+ assert(valueV2.contains("false"),
+ s"Expected default value 'false' for missing config in V2, but got:
$valueV2")
+ }
+
+ // ==============
+ // Helper Methods
+ // ==============
+
+ /**
+ * Helper method to run tests with source evolution enabled.
+ * Sets offset log format to V2 (OffsetMap) since named sources require it.
+ */
+ def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: =>
Any): Unit = {
+ test(testName, testTags: _*) {
+ withSQLConf(
+ SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true",
+ SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
+ testBody
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]