This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a02d71adbe7 [SPARK-55601][SS] Hook StreamingSourceIdentifyingName 
into MicrobatchExecution for source naming
0a02d71adbe7 is described below

commit 0a02d71adbe71a8af3c196249c7a486975a909b2
Author: ericm-db <[email protected]>
AuthorDate: Wed Feb 25 09:36:40 2026 -0800

    [SPARK-55601][SS] Hook StreamingSourceIdentifyingName into 
MicrobatchExecution for source naming
    
    ### What changes were proposed in this pull request?
    
    Move streamingSourceIdentifyingName from being passed explicitly through 
StreamingRelation constructors to being derived from DataSource itself via a 
lazy val, removing the overloaded StreamingRelation.apply and eliminating 
scattered Unassigned defaults at call sites.
    Pull the names from StreamingRelation and StreamingRelationV2 in 
MicroBatchExecution to key the OffsetMap by the source names during query 
execution
    
    ### Why are the changes needed?
    
    This is the last and final piece to allow for source naming
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54373 from ericm-db/streaming-source-identifying-name.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../streaming/StreamingSourceIdentifyingName.scala |  27 ++
 .../sql/catalyst/analysis/ResolveDataSource.scala  |  17 +-
 .../sql/execution/datasources/DataSource.scala     |   7 +-
 .../execution/datasources/DataSourceStrategy.scala |   2 +-
 .../streaming/checkpointing/OffsetSeq.scala        |   6 +-
 .../streaming/runtime/MicroBatchExecution.scala    |  67 ++-
 .../streaming/runtime/StreamingRelation.scala      |  11 +-
 .../execution/streaming/utils/StreamingUtils.scala |  46 ++
 .../execution/streaming/StreamRelationSuite.scala  |   2 +-
 .../StreamingSourceIdentifyingNameSuite.scala      |   6 +-
 .../test/StreamingQueryEvolutionSuite.scala        | 176 -------
 .../test/StreamingSourceEvolutionSuite.scala       | 520 +++++++++++++++++++++
 12 files changed, 667 insertions(+), 220 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
index b89aaf017b1d..8ac72e817811 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
@@ -47,6 +47,33 @@ sealed trait StreamingSourceIdentifyingName {
     case FlowAssigned(name) => s"""name="$name""""
     case Unassigned => "name=<Unassigned>"
   }
+
+  /**
+   * Extracts only user-provided names, filtering out flow-assigned or 
unassigned sources.
+   * Used when narrowing to explicitly user-specified names (e.g., when 
passing to DataSource).
+   *
+   * @return Option value set to the user-provided name if available, None 
otherwise
+   */
+  def toUserProvided: Option[UserProvided] = this match {
+    case up: UserProvided => Some(up)
+    case _ => None
+  }
+
+  /**
+   * Extracts the name string from named sources (UserProvided or 
FlowAssigned).
+   * Returns None for Unassigned sources.
+   *
+   * Useful for pattern matching when both UserProvided and FlowAssigned 
should be
+   * treated identically (e.g., when computing metadata paths or building 
sourceIdMap).
+   *
+   * @return Option value set to the source name (user-provided or 
flow-assigned) if available,
+   *         None otherwise
+   */
+  def nameOpt: Option[String] = this match {
+    case UserProvided(name) => Some(name)
+    case FlowAssigned(name) => Some(name)
+    case Unassigned => None
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala
index 35ff39ce8268..2f139393ade3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala
@@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
UnresolvedDataSource}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, 
Unassigned}
+import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.classic.SparkSession
@@ -84,10 +84,11 @@ class ResolveDataSource(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
         sparkSession,
         userSpecifiedSchema = userSpecifiedSchema,
         className = source,
-        options = optionsWithPath.originalMap)
+        options = optionsWithPath.originalMap,
+        userSpecifiedStreamingSourceName = 
sourceIdentifyingName.toUserProvided)
       val v1Relation = ds match {
         case _: StreamSourceProvider =>
-          Some(StreamingRelation(v1DataSource, sourceIdentifyingName))
+          Some(StreamingRelation(v1DataSource))
         case _ => None
       }
       ds match {
@@ -112,16 +113,16 @@ class ResolveDataSource(sparkSession: SparkSession) 
extends Rule[LogicalPlan] {
               StreamingRelationV2(
                   Some(provider), source, table, dsOptions,
                   toAttributes(table.columns.asSchema), None, None, v1Relation,
-                  sourceIdentifyingName)
+                  v1DataSource.streamingSourceIdentifyingName)
 
             // fallback to v1
             // TODO (SPARK-27483): we should move this fallback logic to an 
analyzer rule.
-            case _ => StreamingRelation(v1DataSource, sourceIdentifyingName)
+            case _ => StreamingRelation(v1DataSource)
           }
 
         case _ =>
           // Code path for data source v1.
-          StreamingRelation(v1DataSource, sourceIdentifyingName)
+          StreamingRelation(v1DataSource)
       }
 
     case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, true, 
paths) =>
@@ -148,7 +149,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
         options = optionsWithPath.originalMap)
       val v1Relation = ds match {
         case _: StreamSourceProvider =>
-          Some(StreamingRelation(v1DataSource, Unassigned))
+          Some(StreamingRelation(v1DataSource))
         case _ => None
       }
       ds match {
@@ -173,7 +174,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
               StreamingRelationV2(
                   Some(provider), source, table, dsOptions,
                   toAttributes(table.columns.asSchema), None, None, v1Relation,
-                  Unassigned)
+                  v1DataSource.streamingSourceIdentifyingName)
 
             // fallback to v1
             // TODO (SPARK-27483): we should move this fallback logic to an 
analyzer rule.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0c234e3fcf18..d2ec3f7ff486 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
+import 
org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, 
Unassigned, UserProvided}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
 import org.apache.spark.sql.classic.ClassicConversions.castToImpl
 import org.apache.spark.sql.classic.Dataset
@@ -103,13 +103,16 @@ case class DataSource(
     bucketSpec: Option[BucketSpec] = None,
     options: Map[String, String] = Map.empty,
     catalogTable: Option[CatalogTable] = None,
-    userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] = 
None)
+    userSpecifiedStreamingSourceName: Option[UserProvided] = None)
   extends SessionStateHelper with Logging {
 
   case class SourceInfo(name: String, schema: StructType, partitionColumns: 
Seq[String])
 
   private val conf: SQLConf = getSqlConf(sparkSession)
 
+  lazy val streamingSourceIdentifyingName: StreamingSourceIdentifyingName =
+    userSpecifiedStreamingSourceName.getOrElse(Unassigned)
+
   lazy val providingClass: Class[_] = {
     val cls = DataSource.lookupDataSource(className, conf)
     // `providingClass` is used for resolving data source relation for catalog 
tables.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 220b0b8a6301..63782c753696 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -305,7 +305,7 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
       userSpecifiedSchema = Some(table.schema),
       options = dsOptions,
       catalogTable = Some(table),
-      userSpecifiedStreamingSourceName = Some(sourceIdentifyingName))
+      userSpecifiedStreamingSourceName = sourceIdentifyingName.toUserProvided)
     StreamingRelation(dataSource)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
index dff09c736a83..bf2278b81492 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
@@ -203,7 +203,8 @@ object OffsetSeqMetadata extends Logging {
     STATE_STORE_ROCKSDB_FORMAT_VERSION, 
STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION,
     STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
     PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, 
STREAMING_STATE_STORE_ENCODING_FORMAT,
-    STATE_STORE_ROW_CHECKSUM_ENABLED, PROTOBUF_EXTENSIONS_SUPPORT_ENABLED
+    STATE_STORE_ROW_CHECKSUM_ENABLED, PROTOBUF_EXTENSIONS_SUPPORT_ENABLED,
+    ENABLE_STREAMING_SOURCE_EVOLUTION
   )
 
   /**
@@ -252,7 +253,8 @@ object OffsetSeqMetadata extends Logging {
     STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow",
     STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false",
     STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1",
-    PROTOBUF_EXTENSIONS_SUPPORT_ENABLED.key -> "false"
+    PROTOBUF_EXTENSIONS_SUPPORT_ENABLED.key -> "false",
+    ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false"
   )
 
   def readValue[T](metadataLog: OffsetSeqMetadataBase, confKey: 
ConfigEntry[T]): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index a4f50993f64b..35a658d350ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming.runtime
 
+import java.util.concurrent.atomic.AtomicLong
+
 import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
@@ -30,7 +32,7 @@ import org.apache.spark.internal.LogKeys._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, 
FileSourceMetadataAttribute, LocalTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, 
DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, 
FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, 
LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, 
TransformWithStateInPySpark}
-import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, 
WriteToStream}
+import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, 
Unassigned, WriteToStream}
 import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.classic.{Dataset, SparkSession}
@@ -48,6 +50,7 @@ import 
org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetH
 import 
org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
 DIR_NAME_OFFSETS, DIR_NAME_STATE}
 import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, 
WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
 import 
org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionUtils, 
StateSchemaBroadcast, StateStoreErrors}
+import org.apache.spark.sql.execution.streaming.utils.StreamingUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.connector.PartitionOffsetWithIndex
 import org.apache.spark.sql.streaming.Trigger
@@ -168,7 +171,7 @@ class MicroBatchExecution(
     assert(queryExecutionThread eq Thread.currentThread,
       "logicalPlan must be initialized in QueryExecutionThread " +
         s"but the current thread was ${Thread.currentThread}")
-    var nextSourceId = 0L
+    val nextSourceId = new AtomicLong(0L)
     val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
     val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, 
StreamingExecutionRelation]()
     val v2ToRelationMap = MutableMap[StreamingRelationV2, 
StreamingDataSourceV2ScanRelation]()
@@ -183,15 +186,24 @@ class MicroBatchExecution(
     val disabledSources =
       
Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders)
 
+    // Read the source evolution enforcement from the last written offset log 
entry. If no entries
+    // are found, use the session config value.
+    val enforceNamed = offsetLog.getLatest().flatMap { case (_, offsetSeq) =>
+      offsetSeq.metadataOpt.flatMap { metadata =>
+        OffsetSeqMetadata.readValueOpt(metadata, 
SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
+          .map(_.toBoolean)
+      }
+    
}.getOrElse(sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution)
+
     import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
       case streamingRelation @ StreamingRelation(
           dataSourceV1, sourceName, output, sourceIdentifyingName) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
           // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+          val metadataPath = StreamingUtils.getMetadataPath(
+            sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
           val source = dataSourceV1.createSource(metadataPath)
-          nextSourceId += 1
           logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] 
" +
             log"from DataSourceV1 named 
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, sourceName)}' " +
             log"[${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, 
dataSourceV1)}]")
@@ -206,8 +218,8 @@ class MicroBatchExecution(
         if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
           v2ToRelationMap.getOrElseUpdate(s, {
             // Materialize source to avoid creating it in every batch
-            val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-            nextSourceId += 1
+            val metadataPath = StreamingUtils.getMetadataPath(
+              sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
             logInfo(log"Reading table [${MDC(LogKeys.STREAMING_TABLE, table)}] 
" +
               log"from DataSourceV2 named 
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
               log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
@@ -224,7 +236,8 @@ class MicroBatchExecution(
                 trigger match {
                   case RealTimeTrigger(duration) => Some(duration)
                   case _ => None
-                }
+                },
+                sourceIdentifyingName
               )
             StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
           })
@@ -234,10 +247,10 @@ class MicroBatchExecution(
         } else {
           v2ToExecutionRelationMap.getOrElseUpdate(s, {
             // Materialize source to avoid creating it in every batch
-            val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+            val metadataPath = StreamingUtils.getMetadataPath(
+              sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
             val source =
               
v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
-            nextSourceId += 1
             logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, 
source)}] from " +
               log"DataSourceV2 named 
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
               log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
@@ -247,17 +260,37 @@ class MicroBatchExecution(
           })
         }
     }
-    sources = _logicalPlan.collect {
+
+    // Extract sources and their sourceIdentifyingName for sourceIdMap mapping
+    val sourcesWithNames = _logicalPlan.collect {
       // v1 source
-      case s: StreamingExecutionRelation => s.source
+      case s: StreamingExecutionRelation => (s.source, s.sourceIdentifyingName)
       // v2 source
-      case r: StreamingDataSourceV2ScanRelation => r.stream
+      case r: StreamingDataSourceV2ScanRelation => (r.stream, 
r.relation.sourceIdentifyingName)
+    }
+    sources = sourcesWithNames.map(_._1)
+
+    if (enforceNamed) {
+      // When enforcement is enabled, all sources should be named after 
validation in analysis.
+      // This assertion ensures that the validation in NameStreamingSources 
worked correctly.
+      assert(sourcesWithNames.forall(s => s._2 != Unassigned),
+        "All sources should be named at this point - validation should have 
happened in analysis")
+
+      // Create source ID mapping using names (for OffsetMap format)
+      sourceIdMap = sourcesWithNames.map { case (source, 
sourceIdentifyingName) =>
+        sourceIdentifyingName.nameOpt match {
+          case Some(name) => name -> source
+          case None =>
+            throw new IllegalStateException(
+              "Unassigned sources should not exist when enforcement is 
enabled")
+        }
+      }.toMap
+    } else {
+      // When enforcement is disabled, use positional indices (backward 
compatibility)
+      sourceIdMap = sources.zipWithIndex.map {
+        case (source, index) => index.toString -> source
+      }.toMap
     }
-
-    // Create source ID mapping for OffsetMap support
-    sourceIdMap = sources.zipWithIndex.map {
-      case (source, index) => index.toString -> source
-    }.toMap
 
     // Inform the source if it is in real time mode
     if (trigger.isInstanceOf[RealTimeTrigger]) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
index 232f3475bb1b..efa65bc5ead4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala
@@ -36,18 +36,9 @@ import 
org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
     // Extract source identifying name from DataSource for stable checkpoints
-    val sourceIdentifyingName = 
dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned)
     StreamingRelation(
       dataSource, dataSource.sourceInfo.name, 
toAttributes(dataSource.sourceInfo.schema),
-      sourceIdentifyingName)
-  }
-
-  def apply(
-      dataSource: DataSource,
-      sourceIdentifyingName: StreamingSourceIdentifyingName): 
StreamingRelation = {
-    StreamingRelation(
-      dataSource, dataSource.sourceInfo.name, 
toAttributes(dataSource.sourceInfo.schema),
-      sourceIdentifyingName)
+      dataSource.streamingSourceIdentifyingName)
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala
index d2654ac943b2..8dfc1fd23b57 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala
@@ -16,13 +16,59 @@
  */
 package org.apache.spark.sql.execution.streaming.utils
 
+import java.util.concurrent.atomic.AtomicLong
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
+
 object StreamingUtils {
+  /**
+   * Resolves a checkpoint location to a fully qualified path.
+   *
+   * Converts relative or unqualified paths to fully qualified paths using the
+   * file system's URI and working directory.
+   *
+   * @param hadoopConf Hadoop configuration to access the file system
+   * @param checkpointLocation The checkpoint location path (may be relative 
or unqualified)
+   * @return Fully qualified checkpoint location URI as a string
+   */
   def resolvedCheckpointLocation(hadoopConf: Configuration, 
checkpointLocation: String): String = {
     val checkpointPath = new Path(checkpointLocation)
     val fs = checkpointPath.getFileSystem(hadoopConf)
     checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
   }
+
+  /**
+   * Computes the metadata path for a streaming source based on its 
identifying name.
+   *
+   * Named sources (UserProvided/FlowAssigned) use stable name-based paths, 
enabling
+   * source evolution (reordering, adding, or removing sources without 
breaking state).
+   * Unassigned sources use sequential IDs for backward compatibility.
+   *
+   * Examples:
+   *   - UserProvided("mySource") => "$checkpointRoot/sources/mySource"
+   *   - FlowAssigned("source_1") => "$checkpointRoot/sources/source_1"
+   *   - Unassigned => "$checkpointRoot/sources/0" (increments nextSourceId)
+   *
+   * @param sourceIdentifyingName The source's identifying name
+   *                              (UserProvided, FlowAssigned, or Unassigned)
+   * @param nextSourceId AtomicLong tracking the next positional source ID for 
Unassigned sources
+   * @param resolvedCheckpointRoot The resolved checkpoint root path
+   * @return The computed metadata path string
+   */
+  def getMetadataPath(
+      sourceIdentifyingName: StreamingSourceIdentifyingName,
+      nextSourceId: AtomicLong,
+      resolvedCheckpointRoot: String): String = {
+    sourceIdentifyingName.nameOpt match {
+      case Some(name) =>
+        // User-provided and flow-assigned names use named paths
+        s"$resolvedCheckpointRoot/sources/$name"
+      case None =>
+        // Unassigned sources get sequential IDs assigned here
+        s"$resolvedCheckpointRoot/sources/${nextSourceId.getAndIncrement()}"
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
index 599a48c64208..dc8da2d5a628 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
@@ -146,7 +146,7 @@ class StreamRelationSuite extends SharedSparkSession with 
AnalysisTest {
             ),
             userSpecifiedSchema = Option(catalogTable.schema),
             catalogTable = Option(catalogTable),
-            userSpecifiedStreamingSourceName = Some(Unassigned)
+            userSpecifiedStreamingSourceName = None
           ),
           sourceName = s"FileSource[${catalogTable.location.toString}]",
           output = Seq(idAttr)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
index caf6c14c8000..c89895035c91 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala
@@ -43,10 +43,10 @@ class StreamingSourceIdentifyingNameSuite extends 
SharedSparkSession {
       assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
         s"Expected Unassigned but got 
${streamingRelation.get.sourceIdentifyingName}")
 
-      // Verify the DataSource has the sourceIdentifyingName set
+      // Verify the DataSource has no user-specified source name (None means 
no user-provided name)
       val dsSourceName = 
streamingRelation.get.dataSource.userSpecifiedStreamingSourceName
-      assert(dsSourceName == Some(Unassigned),
-        s"Expected Some(Unassigned) but got $dsSourceName")
+      assert(dsSourceName.isEmpty,
+        s"Expected None but got $dsSourceName")
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala
deleted file mode 100644
index 973e2f75439e..000000000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming.test
-
-import org.scalatest.Tag
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.StreamTest
-
-/**
- * Test suite for streaming source naming and validation.
- * Tests cover the naming API, validation rules, and resolution pipeline.
- */
-class StreamingQueryEvolutionSuite extends StreamTest {
-
-  // ====================
-  // Name Validation Tests
-  // ====================
-
-  testWithSourceEvolution("invalid source name - contains hyphen") {
-    checkError(
-      exception = intercept[AnalysisException] {
-        spark.readStream
-          .format("org.apache.spark.sql.streaming.test")
-          .name("my-source")
-          .load()
-      },
-      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
-      parameters = Map("sourceName" -> "my-source"))
-  }
-
-  testWithSourceEvolution("invalid source name - contains space") {
-    checkError(
-      exception = intercept[AnalysisException] {
-        spark.readStream
-          .format("org.apache.spark.sql.streaming.test")
-          .name("my source")
-          .load()
-      },
-      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
-      parameters = Map("sourceName" -> "my source"))
-  }
-
-  testWithSourceEvolution("invalid source name - contains dot") {
-    checkError(
-      exception = intercept[AnalysisException] {
-        spark.readStream
-          .format("org.apache.spark.sql.streaming.test")
-          .name("my.source")
-          .load()
-      },
-      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
-      parameters = Map("sourceName" -> "my.source"))
-  }
-
-  testWithSourceEvolution("invalid source name - contains special characters") 
{
-    checkError(
-      exception = intercept[AnalysisException] {
-        spark.readStream
-          .format("org.apache.spark.sql.streaming.test")
-          .name("my.source@123")
-          .load()
-      },
-      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
-      parameters = Map("sourceName" -> "my.source@123"))
-  }
-
-  testWithSourceEvolution("valid source names - various patterns") {
-    // Test that valid names work correctly
-    Seq("mySource", "my_source", "MySource123", "_private", "source_123_test", 
"123source")
-      .foreach { name =>
-        val df = spark.readStream
-          .format("org.apache.spark.sql.streaming.test")
-          .name(name)
-          .load()
-        assert(df.isStreaming, s"DataFrame should be streaming for name: 
$name")
-      }
-  }
-
-  testWithSourceEvolution("method chaining - name() returns reader for 
chaining") {
-    val df = spark.readStream
-      .format("org.apache.spark.sql.streaming.test")
-      .name("my_source")
-      .option("opt1", "value1")
-      .load()
-
-    assert(df.isStreaming, "DataFrame should be streaming")
-  }
-
-  // ==========================
-  // Duplicate Detection Tests
-  // ==========================
-
-  testWithSourceEvolution("duplicate source names - rejected when starting 
stream") {
-    withTempDir { checkpointDir =>
-      val df1 = spark.readStream
-        .format("org.apache.spark.sql.streaming.test")
-        .name("duplicate_name")
-        .load()
-
-      val df2 = spark.readStream
-        .format("org.apache.spark.sql.streaming.test")
-        .name("duplicate_name")  // Same name - should fail
-        .load()
-
-      checkError(
-        exception = intercept[AnalysisException] {
-          df1.union(df2).writeStream
-            .format("org.apache.spark.sql.streaming.test")
-            .option("checkpointLocation", checkpointDir.getCanonicalPath)
-            .start()
-        },
-        condition = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
-        parameters = Map("names" -> "'duplicate_name'"))
-    }
-  }
-
-  testWithSourceEvolution("enforcement enabled - unnamed source rejected") {
-    checkError(
-      exception = intercept[AnalysisException] {
-        spark.readStream
-          .format("org.apache.spark.sql.streaming.test")
-          .load() // Unnamed - throws error at load() time
-      },
-      condition = 
"STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT",
-      parameters = Map("sourceInfo" -> ".*"),
-      matchPVals = true)
-  }
-
-  testWithSourceEvolution("enforcement enabled - all sources named succeeds") {
-    val df1 = spark.readStream
-      .format("org.apache.spark.sql.streaming.test")
-      .name("alpha")
-      .load()
-
-    val df2 = spark.readStream
-      .format("org.apache.spark.sql.streaming.test")
-      .name("beta")
-      .load()
-
-    // Should not throw - all sources are named
-    val union = df1.union(df2)
-    assert(union.isStreaming, "Union should be streaming")
-  }
-
-  // ==============
-  // Helper Methods
-  // ==============
-
-  /**
-   * Helper method to run tests with source evolution enabled.
-   */
-  def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: => 
Any): Unit = {
-    test(testName, testTags: _*) {
-      withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
-        testBody
-      }
-    }
-  }
-}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
new file mode 100644
index 000000000000..ec919f1f1242
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.test
+
+import scala.concurrent.duration._
+
+import org.apache.hadoop.fs.Path
+import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterEach, Tag}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.Trigger._
+import org.apache.spark.util.Utils
+
+/**
+ * Test suite for streaming source naming and validation.
+ * Tests cover the naming API, validation rules, and resolution pipeline.
+ */
+class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach 
{
+
+  private def newMetadataDir =
+    Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  override def afterEach(): Unit = {
+    spark.streams.active.foreach(_.stop())
+    super.afterEach()
+  }
+
+  /**
+   * Helper to verify that a source was created with the expected metadata 
path.
+   * @param checkpointLocation the checkpoint location path
+   * @param sourcePath the expected source path (e.g., "source1" or "0")
+   * @param mode mockito verification mode (default: times(1))
+   */
+  private def verifySourcePath(
+      checkpointLocation: Path,
+      sourcePath: String,
+      mode: org.mockito.verification.VerificationMode = times(1)): Unit = {
+    verify(LastOptions.mockStreamSourceProvider, mode).createSource(
+      any(),
+      meq(s"${new Path(makeQualifiedPath(
+        checkpointLocation.toString)).toString}/sources/$sourcePath"),
+      meq(None),
+      meq("org.apache.spark.sql.streaming.test"),
+      meq(Map.empty))
+  }
+
+  // ====================
+  // Name Validation Tests
+  // ====================
+
+  testWithSourceEvolution("invalid source name - contains hyphen") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.readStream
+          .format("org.apache.spark.sql.streaming.test")
+          .name("my-source")
+          .load()
+      },
+      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+      parameters = Map("sourceName" -> "my-source"))
+  }
+
+  testWithSourceEvolution("invalid source name - contains space") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.readStream
+          .format("org.apache.spark.sql.streaming.test")
+          .name("my source")
+          .load()
+      },
+      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+      parameters = Map("sourceName" -> "my source"))
+  }
+
+  testWithSourceEvolution("invalid source name - contains dot") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.readStream
+          .format("org.apache.spark.sql.streaming.test")
+          .name("my.source")
+          .load()
+      },
+      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+      parameters = Map("sourceName" -> "my.source"))
+  }
+
+  testWithSourceEvolution("invalid source name - contains special characters") 
{
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.readStream
+          .format("org.apache.spark.sql.streaming.test")
+          .name("my.source@123")
+          .load()
+      },
+      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+      parameters = Map("sourceName" -> "my.source@123"))
+  }
+
+  testWithSourceEvolution("valid source names - various patterns") {
+    // Test that valid names work correctly
+    Seq("mySource", "my_source", "MySource123", "_private", "source_123_test", 
"123source")
+      .foreach { name =>
+        val df = spark.readStream
+          .format("org.apache.spark.sql.streaming.test")
+          .name(name)
+          .load()
+        assert(df.isStreaming, s"DataFrame should be streaming for name: 
$name")
+      }
+  }
+
+  testWithSourceEvolution("method chaining - name() returns reader for 
chaining") {
+    val df = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("my_source")
+      .option("opt1", "value1")
+      .load()
+
+    assert(df.isStreaming, "DataFrame should be streaming")
+  }
+
+  // ==========================
+  // Duplicate Detection Tests
+  // ==========================
+
+  testWithSourceEvolution("duplicate source names - rejected when starting 
stream") {
+    withTempDir { checkpointDir =>
+      val df1 = spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .name("duplicate_name")
+        .load()
+
+      val df2 = spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .name("duplicate_name")  // Same name - should fail
+        .load()
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          df1.union(df2).writeStream
+            .format("org.apache.spark.sql.streaming.test")
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .start()
+        },
+        condition = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
+        parameters = Map("names" -> "'duplicate_name'"))
+    }
+  }
+
+  testWithSourceEvolution("enforcement enabled - unnamed source rejected") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.readStream
+          .format("org.apache.spark.sql.streaming.test")
+          .load() // Unnamed - throws error at load() time
+      },
+      condition = 
"STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT",
+      parameters = Map("sourceInfo" -> ".*"),
+      matchPVals = true)
+  }
+
+  testWithSourceEvolution("enforcement enabled - all sources named succeeds") {
+    val df1 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("alpha")
+      .load()
+
+    val df2 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("beta")
+      .load()
+
+    // Should not throw - all sources are named
+    val union = df1.union(df2)
+    assert(union.isStreaming, "Union should be streaming")
+  }
+
+  test("without enforcement - naming sources throws error") {
+    withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          spark.readStream
+            .format("org.apache.spark.sql.streaming.test")
+            .name("mySource")
+            .load()
+        },
+        condition = 
"STREAMING_QUERY_EVOLUTION_ERROR.SOURCE_NAMING_NOT_SUPPORTED",
+        parameters = Map("name" -> "mySource"))
+    }
+  }
+
+  // =======================
+  // Metadata Path Tests
+  // =======================
+
+  testWithSourceEvolution("named sources - metadata path uses source name") {
+    LastOptions.clear()
+
+    val checkpointLocation = new Path(newMetadataDir)
+
+    val df1 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source1")
+      .load()
+
+    val df2 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source2")
+      .load()
+
+    val q = df1.union(df2).writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q.processAllAvailable()
+    q.stop()
+
+    verifySourcePath(checkpointLocation, "source1")
+    verifySourcePath(checkpointLocation, "source2")
+  }
+
+  test("unnamed sources use positional IDs for metadata path") {
+    withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") {
+      LastOptions.clear()
+
+      val checkpointLocation = new Path(newMetadataDir)
+
+      val df1 = spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .load()
+
+      val df2 = spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .load()
+
+      val q = df1.union(df2).writeStream
+        .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", checkpointLocation.toString)
+        .trigger(ProcessingTime(10.seconds))
+        .start()
+      q.processAllAvailable()
+      q.stop()
+
+      // Without naming, sources get sequential IDs (Unassigned -> 0, 1, ...)
+      verifySourcePath(checkpointLocation, "0")
+      verifySourcePath(checkpointLocation, "1")
+    }
+  }
+
+  // ========================
+  // Source Evolution Tests
+  // ========================
+
+  testWithSourceEvolution("source evolution - reorder sources with named 
sources") {
+    LastOptions.clear()
+
+    val checkpointLocation = new Path(newMetadataDir)
+
+    // First query: source1 then source2
+    val df1a = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source1")
+      .load()
+
+    val df2a = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source2")
+      .load()
+
+    val q1 = df1a.union(df2a).writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q1.processAllAvailable()
+    q1.stop()
+
+    LastOptions.clear()
+
+    // Second query: source2 then source1 (reordered) - should still work
+    val df1b = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source1")
+      .load()
+
+    val df2b = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source2")
+      .load()
+
+    val q2 = df2b.union(df1b).writeStream  // Note: reversed order
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q2.processAllAvailable()
+    q2.stop()
+
+    // Both sources should still use their named paths
+    verifySourcePath(checkpointLocation, "source1", atLeastOnce())
+    verifySourcePath(checkpointLocation, "source2", atLeastOnce())
+  }
+
+  testWithSourceEvolution("source evolution - add new source with named 
sources") {
+    LastOptions.clear()
+
+    val checkpointLocation = new Path(newMetadataDir)
+
+    // First query: only source1
+    val df1 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source1")
+      .load()
+
+    val q1 = df1.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q1.processAllAvailable()
+    q1.stop()
+
+    LastOptions.clear()
+
+    // Second query: add source2
+    val df1b = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source1")
+      .load()
+
+    val df2 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source2")
+      .load()
+
+    val q2 = df1b.union(df2).writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q2.processAllAvailable()
+    q2.stop()
+
+    // Both sources should have been created
+    verifySourcePath(checkpointLocation, "source1", atLeastOnce())
+    verifySourcePath(checkpointLocation, "source2")
+  }
+
+  testWithSourceEvolution("named sources enforcement uses V2 offset log 
format") {
+    LastOptions.clear()
+
+    val checkpointLocation = new Path(newMetadataDir)
+
+    val df1 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source1")
+      .load()
+
+    val df2 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source2")
+      .load()
+
+    val q = df1.union(df2).writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q.processAllAvailable()
+    q.stop()
+
+    import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, 
OffsetSeqLog}
+    val offsetLog = new OffsetSeqLog(spark,
+      makeQualifiedPath(checkpointLocation.toString).toString + "/offsets")
+    val offsetSeq = offsetLog.get(0)
+    assert(offsetSeq.isDefined, "Offset log should have batch 0")
+    assert(offsetSeq.get.isInstanceOf[OffsetMap],
+      s"Expected OffsetMap but got ${offsetSeq.get.getClass.getSimpleName}")
+  }
+
+  testWithSourceEvolution("names preserved through union operations") {
+    LastOptions.clear()
+
+    val checkpointLocation = new Path(newMetadataDir)
+
+    val df1 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("alpha")
+      .load()
+
+    val df2 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("beta")
+      .load()
+
+    val df3 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("gamma")
+      .load()
+
+    // Complex union: (alpha union beta) union gamma
+    val q = df1.union(df2).union(df3).writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q.processAllAvailable()
+    q.stop()
+
+    // All three sources should use their named paths
+    verifySourcePath(checkpointLocation, "alpha")
+    verifySourcePath(checkpointLocation, "beta")
+    verifySourcePath(checkpointLocation, "gamma")
+  }
+
+  testWithSourceEvolution("enforcement config is persisted in offset 
metadata") {
+    LastOptions.clear()
+
+    val checkpointLocation = new Path(newMetadataDir)
+
+    // Start query with enforcement enabled
+    val df1 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source1")
+      .load()
+
+    val q1 = df1.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q1.processAllAvailable()
+    q1.stop()
+
+    // Verify config was persisted in offset metadata
+    import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog
+    val offsetLog = new OffsetSeqLog(spark,
+      makeQualifiedPath(checkpointLocation.toString).toString + "/offsets")
+    val offsetSeq = offsetLog.get(0)
+    assert(offsetSeq.isDefined, "Offset log should have batch 0")
+    assert(offsetSeq.get.metadataOpt.isDefined, "Offset metadata should be 
present")
+    assert(offsetSeq.get.metadataOpt.get.conf.contains(
+      SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key),
+      "ENABLE_STREAMING_SOURCE_EVOLUTION should be in offset metadata")
+    assert(offsetSeq.get.metadataOpt.get.conf(
+      SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key) == "true",
+      "ENABLE_STREAMING_SOURCE_EVOLUTION should be true in offset metadata")
+  }
+
+  test("upgrade from old checkpoint without enforcement config uses default 
value") {
+    import 
org.apache.spark.sql.execution.streaming.checkpointing.{OffsetSeqMetadata, 
OffsetSeqMetadataV2}
+
+    // Simulate old checkpoint metadata without 
ENABLE_STREAMING_SOURCE_EVOLUTION
+    val oldMetadata = OffsetSeqMetadata(
+      batchWatermarkMs = 0,
+      batchTimestampMs = 0,
+      conf = Map(
+        // Old checkpoint has other configs but not 
ENABLE_STREAMING_SOURCE_EVOLUTION
+        SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max"
+      )
+    )
+
+    // Verify that reading the config returns the default value (false)
+    val value = OffsetSeqMetadata.readValueOpt(
+      oldMetadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
+    assert(value.contains("false"),
+      s"Expected default value 'false' for missing config, but got: $value")
+
+    // Also test with V2 metadata
+    val oldMetadataV2 = OffsetSeqMetadataV2(
+      batchWatermarkMs = 0,
+      batchTimestampMs = 0,
+      conf = Map(
+        SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max"
+      )
+    )
+
+    val valueV2 = OffsetSeqMetadata.readValueOpt(
+      oldMetadataV2, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
+    assert(valueV2.contains("false"),
+      s"Expected default value 'false' for missing config in V2, but got: 
$valueV2")
+  }
+
+  // ==============
+  // Helper Methods
+  // ==============
+
+  /**
+   * Helper method to run tests with source evolution enabled.
+   * Sets offset log format to V2 (OffsetMap) since named sources require it.
+   */
+  def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: => 
Any): Unit = {
+    test(testName, testTags: _*) {
+      withSQLConf(
+        SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true",
+        SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
+        testBody
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to