This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d069f733e95d [SPARK-55104][CONNECT][PYTHON][SS] Add Spark Connect 
support for DataStreamReader.name()
d069f733e95d is described below

commit d069f733e95d8c1f5a5534b3ca3159af6a839bfb
Author: ericm-db <[email protected]>
AuthorDate: Wed Jan 28 10:26:19 2026 +0800

    [SPARK-55104][CONNECT][PYTHON][SS] Add Spark Connect support for 
DataStreamReader.name()
    
    ### What changes were proposed in this pull request?
    
    This PR adds Spark Connect support for the `DataStreamReader.name()` 
method, which allows users to specify a name for streaming sources. The 
implementation includes:
    
    1. **Protobuf changes**: Added `source_name` field to the `DataSource` 
message
    2. **Scala Connect**: Added `name()` method with validation to Connect 
`DataStreamReader`
    3. **Python Connect**: Added `name()` method to Python Connect 
`DataStreamReader`
    4. **Server-side planner**: Updated `SparkConnectPlanner` to extract and 
pass source names
    5. **Comprehensive tests**: Added test suites for both Scala and Python 
Connect
    
    ### Why are the changes needed?
    
    The classic Spark `DataStreamReader` has a `name()` method that allows 
users to specify names for streaming sources, used in checkpoint metadata for 
source evolution. This functionality was not available in Spark Connect, 
creating an API gap between classic and Connect modes.
    
    This PR enables API parity by providing full Connect support for streaming 
source naming.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Users can now call `name()` on `DataStreamReader` in Spark Connect:
    
    **Scala:**
    ```scala
    spark.readStream.format("parquet").schema("id 
LONG").name("my_source").load(path)
    ```
    
    **Python:**
    ```python
    spark.readStream.format("parquet").schema("id 
LONG").name("my_source").load(path)
    ```
    
    Names must contain only ASCII letters, digits, and underscores.
    
    ### How was this patch tested?
    
    Added comprehensive test coverage:
    
    1. **Scala Connect tests** (`DataStreamReaderNameSuite`): Valid/invalid 
name validation, method chaining, different data sources, query execution
    2. **Python Connect tests** (`test_streaming_reader_name.py`): 
Valid/invalid name validation, type checking, method chaining, different 
formats, query execution
    
    All tests enable required streaming source evolution configs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53872 from ericm-db/connect-datastream-reader-name.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 python/pyspark/sql/connect/plan.py                 |   6 +
 python/pyspark/sql/connect/proto/relations_pb2.py  | 274 ++++++++++-----------
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  19 ++
 python/pyspark/sql/connect/streaming/readwriter.py |  25 ++
 .../sql/tests/connect/test_connect_readwriter.py   | 183 ++++++++++++++
 .../sql/tests/test_connect_compatibility.py        |   2 +-
 .../spark/sql/streaming/DataStreamReader.scala     |  23 +-
 .../streaming/ClientStreamingQuerySuite.scala      |  95 ++++++-
 .../main/protobuf/spark/connect/relations.proto    |   5 +
 .../spark/sql/connect/DataStreamReader.scala       |  17 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |   3 +
 .../spark/sql/classic/DataStreamReader.scala       |  26 +-
 12 files changed, 512 insertions(+), 166 deletions(-)

diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 80b6b562369e..ca470598de76 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -334,6 +334,7 @@ class DataSource(LogicalPlan):
         paths: Optional[List[str]] = None,
         predicates: Optional[List[str]] = None,
         is_streaming: Optional[bool] = None,
+        source_name: Optional[str] = None,
     ) -> None:
         super().__init__(None)
 
@@ -357,12 +358,15 @@ class DataSource(LogicalPlan):
             assert isinstance(predicates, list)
             assert all(isinstance(predicate, str) for predicate in predicates)
 
+        assert source_name is None or isinstance(source_name, str)
+
         self._format = format
         self._schema = schema
         self._options = options
         self._paths = paths
         self._predicates = predicates
         self._is_streaming = is_streaming
+        self._source_name = source_name
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
         plan = self._create_proto_relation()
@@ -377,6 +381,8 @@ class DataSource(LogicalPlan):
             plan.read.data_source.paths.extend(self._paths)
         if self._predicates is not None and len(self._predicates) > 0:
             plan.read.data_source.predicates.extend(self._predicates)
+        if self._source_name is not None:
+            plan.read.data_source.source_name = self._source_name
         if self._is_streaming is not None:
             plan.read.is_streaming = self._is_streaming
         return plan
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 9e630b6ba5e4..79d834033634 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -43,7 +43,7 @@ from pyspark.sql.connect.proto import ml_common_pb2 as 
spark_dot_connect_dot_ml_
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/ml_common.proto"\x8c\x1e\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/ml_common.proto"\x8c\x1e\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x [...]
 )
 
 _globals = globals()
@@ -103,149 +103,149 @@ if not _descriptor._USE_C_DESCRIPTORS:
     _globals["_WITHRELATIONS"]._serialized_start = 5536
     _globals["_WITHRELATIONS"]._serialized_end = 5653
     _globals["_READ"]._serialized_start = 5656
-    _globals["_READ"]._serialized_end = 6319
+    _globals["_READ"]._serialized_end = 6373
     _globals["_READ_NAMEDTABLE"]._serialized_start = 5834
     _globals["_READ_NAMEDTABLE"]._serialized_end = 6026
     _globals["_READ_NAMEDTABLE_OPTIONSENTRY"]._serialized_start = 5968
     _globals["_READ_NAMEDTABLE_OPTIONSENTRY"]._serialized_end = 6026
     _globals["_READ_DATASOURCE"]._serialized_start = 6029
-    _globals["_READ_DATASOURCE"]._serialized_end = 6306
+    _globals["_READ_DATASOURCE"]._serialized_end = 6360
     _globals["_READ_DATASOURCE_OPTIONSENTRY"]._serialized_start = 5968
     _globals["_READ_DATASOURCE_OPTIONSENTRY"]._serialized_end = 6026
-    _globals["_PROJECT"]._serialized_start = 6321
-    _globals["_PROJECT"]._serialized_end = 6438
-    _globals["_FILTER"]._serialized_start = 6440
-    _globals["_FILTER"]._serialized_end = 6552
-    _globals["_JOIN"]._serialized_start = 6555
-    _globals["_JOIN"]._serialized_end = 7216
-    _globals["_JOIN_JOINDATATYPE"]._serialized_start = 6894
-    _globals["_JOIN_JOINDATATYPE"]._serialized_end = 6986
-    _globals["_JOIN_JOINTYPE"]._serialized_start = 6989
-    _globals["_JOIN_JOINTYPE"]._serialized_end = 7197
-    _globals["_SETOPERATION"]._serialized_start = 7219
-    _globals["_SETOPERATION"]._serialized_end = 7698
-    _globals["_SETOPERATION_SETOPTYPE"]._serialized_start = 7535
-    _globals["_SETOPERATION_SETOPTYPE"]._serialized_end = 7649
-    _globals["_LIMIT"]._serialized_start = 7700
-    _globals["_LIMIT"]._serialized_end = 7776
-    _globals["_OFFSET"]._serialized_start = 7778
-    _globals["_OFFSET"]._serialized_end = 7857
-    _globals["_TAIL"]._serialized_start = 7859
-    _globals["_TAIL"]._serialized_end = 7934
-    _globals["_AGGREGATE"]._serialized_start = 7937
-    _globals["_AGGREGATE"]._serialized_end = 8703
-    _globals["_AGGREGATE_PIVOT"]._serialized_start = 8352
-    _globals["_AGGREGATE_PIVOT"]._serialized_end = 8463
-    _globals["_AGGREGATE_GROUPINGSETS"]._serialized_start = 8465
-    _globals["_AGGREGATE_GROUPINGSETS"]._serialized_end = 8541
-    _globals["_AGGREGATE_GROUPTYPE"]._serialized_start = 8544
-    _globals["_AGGREGATE_GROUPTYPE"]._serialized_end = 8703
-    _globals["_SORT"]._serialized_start = 8706
-    _globals["_SORT"]._serialized_end = 8866
-    _globals["_DROP"]._serialized_start = 8869
-    _globals["_DROP"]._serialized_end = 9010
-    _globals["_DEDUPLICATE"]._serialized_start = 9013
-    _globals["_DEDUPLICATE"]._serialized_end = 9253
-    _globals["_LOCALRELATION"]._serialized_start = 9255
-    _globals["_LOCALRELATION"]._serialized_end = 9344
-    _globals["_CACHEDLOCALRELATION"]._serialized_start = 9346
-    _globals["_CACHEDLOCALRELATION"]._serialized_end = 9418
-    _globals["_CHUNKEDCACHEDLOCALRELATION"]._serialized_start = 9420
-    _globals["_CHUNKEDCACHEDLOCALRELATION"]._serialized_end = 9532
-    _globals["_CACHEDREMOTERELATION"]._serialized_start = 9534
-    _globals["_CACHEDREMOTERELATION"]._serialized_end = 9589
-    _globals["_SAMPLE"]._serialized_start = 9592
-    _globals["_SAMPLE"]._serialized_end = 9865
-    _globals["_RANGE"]._serialized_start = 9868
-    _globals["_RANGE"]._serialized_end = 10013
-    _globals["_SUBQUERYALIAS"]._serialized_start = 10015
-    _globals["_SUBQUERYALIAS"]._serialized_end = 10129
-    _globals["_REPARTITION"]._serialized_start = 10132
-    _globals["_REPARTITION"]._serialized_end = 10274
-    _globals["_SHOWSTRING"]._serialized_start = 10277
-    _globals["_SHOWSTRING"]._serialized_end = 10419
-    _globals["_HTMLSTRING"]._serialized_start = 10421
-    _globals["_HTMLSTRING"]._serialized_end = 10535
-    _globals["_STATSUMMARY"]._serialized_start = 10537
-    _globals["_STATSUMMARY"]._serialized_end = 10629
-    _globals["_STATDESCRIBE"]._serialized_start = 10631
-    _globals["_STATDESCRIBE"]._serialized_end = 10712
-    _globals["_STATCROSSTAB"]._serialized_start = 10714
-    _globals["_STATCROSSTAB"]._serialized_end = 10815
-    _globals["_STATCOV"]._serialized_start = 10817
-    _globals["_STATCOV"]._serialized_end = 10913
-    _globals["_STATCORR"]._serialized_start = 10916
-    _globals["_STATCORR"]._serialized_end = 11053
-    _globals["_STATAPPROXQUANTILE"]._serialized_start = 11056
-    _globals["_STATAPPROXQUANTILE"]._serialized_end = 11220
-    _globals["_STATFREQITEMS"]._serialized_start = 11222
-    _globals["_STATFREQITEMS"]._serialized_end = 11347
-    _globals["_STATSAMPLEBY"]._serialized_start = 11350
-    _globals["_STATSAMPLEBY"]._serialized_end = 11659
-    _globals["_STATSAMPLEBY_FRACTION"]._serialized_start = 11551
-    _globals["_STATSAMPLEBY_FRACTION"]._serialized_end = 11650
-    _globals["_NAFILL"]._serialized_start = 11662
-    _globals["_NAFILL"]._serialized_end = 11796
-    _globals["_NADROP"]._serialized_start = 11799
-    _globals["_NADROP"]._serialized_end = 11933
-    _globals["_NAREPLACE"]._serialized_start = 11936
-    _globals["_NAREPLACE"]._serialized_end = 12232
-    _globals["_NAREPLACE_REPLACEMENT"]._serialized_start = 12091
-    _globals["_NAREPLACE_REPLACEMENT"]._serialized_end = 12232
-    _globals["_TODF"]._serialized_start = 12234
-    _globals["_TODF"]._serialized_end = 12322
-    _globals["_WITHCOLUMNSRENAMED"]._serialized_start = 12325
-    _globals["_WITHCOLUMNSRENAMED"]._serialized_end = 12707
-    _globals["_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY"]._serialized_start = 
12569
-    _globals["_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY"]._serialized_end = 
12636
-    _globals["_WITHCOLUMNSRENAMED_RENAME"]._serialized_start = 12638
-    _globals["_WITHCOLUMNSRENAMED_RENAME"]._serialized_end = 12707
-    _globals["_WITHCOLUMNS"]._serialized_start = 12709
-    _globals["_WITHCOLUMNS"]._serialized_end = 12828
-    _globals["_WITHWATERMARK"]._serialized_start = 12831
-    _globals["_WITHWATERMARK"]._serialized_end = 12965
-    _globals["_HINT"]._serialized_start = 12968
-    _globals["_HINT"]._serialized_end = 13100
-    _globals["_UNPIVOT"]._serialized_start = 13103
-    _globals["_UNPIVOT"]._serialized_end = 13430
-    _globals["_UNPIVOT_VALUES"]._serialized_start = 13360
-    _globals["_UNPIVOT_VALUES"]._serialized_end = 13419
-    _globals["_TRANSPOSE"]._serialized_start = 13432
-    _globals["_TRANSPOSE"]._serialized_end = 13554
-    _globals["_UNRESOLVEDTABLEVALUEDFUNCTION"]._serialized_start = 13556
-    _globals["_UNRESOLVEDTABLEVALUEDFUNCTION"]._serialized_end = 13681
-    _globals["_TOSCHEMA"]._serialized_start = 13683
-    _globals["_TOSCHEMA"]._serialized_end = 13789
-    _globals["_REPARTITIONBYEXPRESSION"]._serialized_start = 13792
-    _globals["_REPARTITIONBYEXPRESSION"]._serialized_end = 13995
-    _globals["_MAPPARTITIONS"]._serialized_start = 13998
-    _globals["_MAPPARTITIONS"]._serialized_end = 14230
-    _globals["_GROUPMAP"]._serialized_start = 14233
-    _globals["_GROUPMAP"]._serialized_end = 15083
-    _globals["_TRANSFORMWITHSTATEINFO"]._serialized_start = 15086
-    _globals["_TRANSFORMWITHSTATEINFO"]._serialized_end = 15309
-    _globals["_COGROUPMAP"]._serialized_start = 15312
-    _globals["_COGROUPMAP"]._serialized_end = 15838
-    _globals["_APPLYINPANDASWITHSTATE"]._serialized_start = 15841
-    _globals["_APPLYINPANDASWITHSTATE"]._serialized_end = 16198
-    _globals["_COMMONINLINEUSERDEFINEDTABLEFUNCTION"]._serialized_start = 16201
-    _globals["_COMMONINLINEUSERDEFINEDTABLEFUNCTION"]._serialized_end = 16445
-    _globals["_PYTHONUDTF"]._serialized_start = 16448
-    _globals["_PYTHONUDTF"]._serialized_end = 16625
-    _globals["_COMMONINLINEUSERDEFINEDDATASOURCE"]._serialized_start = 16628
-    _globals["_COMMONINLINEUSERDEFINEDDATASOURCE"]._serialized_end = 16779
-    _globals["_PYTHONDATASOURCE"]._serialized_start = 16781
-    _globals["_PYTHONDATASOURCE"]._serialized_end = 16856
-    _globals["_COLLECTMETRICS"]._serialized_start = 16859
-    _globals["_COLLECTMETRICS"]._serialized_end = 16995
-    _globals["_PARSE"]._serialized_start = 16998
-    _globals["_PARSE"]._serialized_end = 17386
+    _globals["_PROJECT"]._serialized_start = 6375
+    _globals["_PROJECT"]._serialized_end = 6492
+    _globals["_FILTER"]._serialized_start = 6494
+    _globals["_FILTER"]._serialized_end = 6606
+    _globals["_JOIN"]._serialized_start = 6609
+    _globals["_JOIN"]._serialized_end = 7270
+    _globals["_JOIN_JOINDATATYPE"]._serialized_start = 6948
+    _globals["_JOIN_JOINDATATYPE"]._serialized_end = 7040
+    _globals["_JOIN_JOINTYPE"]._serialized_start = 7043
+    _globals["_JOIN_JOINTYPE"]._serialized_end = 7251
+    _globals["_SETOPERATION"]._serialized_start = 7273
+    _globals["_SETOPERATION"]._serialized_end = 7752
+    _globals["_SETOPERATION_SETOPTYPE"]._serialized_start = 7589
+    _globals["_SETOPERATION_SETOPTYPE"]._serialized_end = 7703
+    _globals["_LIMIT"]._serialized_start = 7754
+    _globals["_LIMIT"]._serialized_end = 7830
+    _globals["_OFFSET"]._serialized_start = 7832
+    _globals["_OFFSET"]._serialized_end = 7911
+    _globals["_TAIL"]._serialized_start = 7913
+    _globals["_TAIL"]._serialized_end = 7988
+    _globals["_AGGREGATE"]._serialized_start = 7991
+    _globals["_AGGREGATE"]._serialized_end = 8757
+    _globals["_AGGREGATE_PIVOT"]._serialized_start = 8406
+    _globals["_AGGREGATE_PIVOT"]._serialized_end = 8517
+    _globals["_AGGREGATE_GROUPINGSETS"]._serialized_start = 8519
+    _globals["_AGGREGATE_GROUPINGSETS"]._serialized_end = 8595
+    _globals["_AGGREGATE_GROUPTYPE"]._serialized_start = 8598
+    _globals["_AGGREGATE_GROUPTYPE"]._serialized_end = 8757
+    _globals["_SORT"]._serialized_start = 8760
+    _globals["_SORT"]._serialized_end = 8920
+    _globals["_DROP"]._serialized_start = 8923
+    _globals["_DROP"]._serialized_end = 9064
+    _globals["_DEDUPLICATE"]._serialized_start = 9067
+    _globals["_DEDUPLICATE"]._serialized_end = 9307
+    _globals["_LOCALRELATION"]._serialized_start = 9309
+    _globals["_LOCALRELATION"]._serialized_end = 9398
+    _globals["_CACHEDLOCALRELATION"]._serialized_start = 9400
+    _globals["_CACHEDLOCALRELATION"]._serialized_end = 9472
+    _globals["_CHUNKEDCACHEDLOCALRELATION"]._serialized_start = 9474
+    _globals["_CHUNKEDCACHEDLOCALRELATION"]._serialized_end = 9586
+    _globals["_CACHEDREMOTERELATION"]._serialized_start = 9588
+    _globals["_CACHEDREMOTERELATION"]._serialized_end = 9643
+    _globals["_SAMPLE"]._serialized_start = 9646
+    _globals["_SAMPLE"]._serialized_end = 9919
+    _globals["_RANGE"]._serialized_start = 9922
+    _globals["_RANGE"]._serialized_end = 10067
+    _globals["_SUBQUERYALIAS"]._serialized_start = 10069
+    _globals["_SUBQUERYALIAS"]._serialized_end = 10183
+    _globals["_REPARTITION"]._serialized_start = 10186
+    _globals["_REPARTITION"]._serialized_end = 10328
+    _globals["_SHOWSTRING"]._serialized_start = 10331
+    _globals["_SHOWSTRING"]._serialized_end = 10473
+    _globals["_HTMLSTRING"]._serialized_start = 10475
+    _globals["_HTMLSTRING"]._serialized_end = 10589
+    _globals["_STATSUMMARY"]._serialized_start = 10591
+    _globals["_STATSUMMARY"]._serialized_end = 10683
+    _globals["_STATDESCRIBE"]._serialized_start = 10685
+    _globals["_STATDESCRIBE"]._serialized_end = 10766
+    _globals["_STATCROSSTAB"]._serialized_start = 10768
+    _globals["_STATCROSSTAB"]._serialized_end = 10869
+    _globals["_STATCOV"]._serialized_start = 10871
+    _globals["_STATCOV"]._serialized_end = 10967
+    _globals["_STATCORR"]._serialized_start = 10970
+    _globals["_STATCORR"]._serialized_end = 11107
+    _globals["_STATAPPROXQUANTILE"]._serialized_start = 11110
+    _globals["_STATAPPROXQUANTILE"]._serialized_end = 11274
+    _globals["_STATFREQITEMS"]._serialized_start = 11276
+    _globals["_STATFREQITEMS"]._serialized_end = 11401
+    _globals["_STATSAMPLEBY"]._serialized_start = 11404
+    _globals["_STATSAMPLEBY"]._serialized_end = 11713
+    _globals["_STATSAMPLEBY_FRACTION"]._serialized_start = 11605
+    _globals["_STATSAMPLEBY_FRACTION"]._serialized_end = 11704
+    _globals["_NAFILL"]._serialized_start = 11716
+    _globals["_NAFILL"]._serialized_end = 11850
+    _globals["_NADROP"]._serialized_start = 11853
+    _globals["_NADROP"]._serialized_end = 11987
+    _globals["_NAREPLACE"]._serialized_start = 11990
+    _globals["_NAREPLACE"]._serialized_end = 12286
+    _globals["_NAREPLACE_REPLACEMENT"]._serialized_start = 12145
+    _globals["_NAREPLACE_REPLACEMENT"]._serialized_end = 12286
+    _globals["_TODF"]._serialized_start = 12288
+    _globals["_TODF"]._serialized_end = 12376
+    _globals["_WITHCOLUMNSRENAMED"]._serialized_start = 12379
+    _globals["_WITHCOLUMNSRENAMED"]._serialized_end = 12761
+    _globals["_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY"]._serialized_start = 
12623
+    _globals["_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY"]._serialized_end = 
12690
+    _globals["_WITHCOLUMNSRENAMED_RENAME"]._serialized_start = 12692
+    _globals["_WITHCOLUMNSRENAMED_RENAME"]._serialized_end = 12761
+    _globals["_WITHCOLUMNS"]._serialized_start = 12763
+    _globals["_WITHCOLUMNS"]._serialized_end = 12882
+    _globals["_WITHWATERMARK"]._serialized_start = 12885
+    _globals["_WITHWATERMARK"]._serialized_end = 13019
+    _globals["_HINT"]._serialized_start = 13022
+    _globals["_HINT"]._serialized_end = 13154
+    _globals["_UNPIVOT"]._serialized_start = 13157
+    _globals["_UNPIVOT"]._serialized_end = 13484
+    _globals["_UNPIVOT_VALUES"]._serialized_start = 13414
+    _globals["_UNPIVOT_VALUES"]._serialized_end = 13473
+    _globals["_TRANSPOSE"]._serialized_start = 13486
+    _globals["_TRANSPOSE"]._serialized_end = 13608
+    _globals["_UNRESOLVEDTABLEVALUEDFUNCTION"]._serialized_start = 13610
+    _globals["_UNRESOLVEDTABLEVALUEDFUNCTION"]._serialized_end = 13735
+    _globals["_TOSCHEMA"]._serialized_start = 13737
+    _globals["_TOSCHEMA"]._serialized_end = 13843
+    _globals["_REPARTITIONBYEXPRESSION"]._serialized_start = 13846
+    _globals["_REPARTITIONBYEXPRESSION"]._serialized_end = 14049
+    _globals["_MAPPARTITIONS"]._serialized_start = 14052
+    _globals["_MAPPARTITIONS"]._serialized_end = 14284
+    _globals["_GROUPMAP"]._serialized_start = 14287
+    _globals["_GROUPMAP"]._serialized_end = 15137
+    _globals["_TRANSFORMWITHSTATEINFO"]._serialized_start = 15140
+    _globals["_TRANSFORMWITHSTATEINFO"]._serialized_end = 15363
+    _globals["_COGROUPMAP"]._serialized_start = 15366
+    _globals["_COGROUPMAP"]._serialized_end = 15892
+    _globals["_APPLYINPANDASWITHSTATE"]._serialized_start = 15895
+    _globals["_APPLYINPANDASWITHSTATE"]._serialized_end = 16252
+    _globals["_COMMONINLINEUSERDEFINEDTABLEFUNCTION"]._serialized_start = 16255
+    _globals["_COMMONINLINEUSERDEFINEDTABLEFUNCTION"]._serialized_end = 16499
+    _globals["_PYTHONUDTF"]._serialized_start = 16502
+    _globals["_PYTHONUDTF"]._serialized_end = 16679
+    _globals["_COMMONINLINEUSERDEFINEDDATASOURCE"]._serialized_start = 16682
+    _globals["_COMMONINLINEUSERDEFINEDDATASOURCE"]._serialized_end = 16833
+    _globals["_PYTHONDATASOURCE"]._serialized_start = 16835
+    _globals["_PYTHONDATASOURCE"]._serialized_end = 16910
+    _globals["_COLLECTMETRICS"]._serialized_start = 16913
+    _globals["_COLLECTMETRICS"]._serialized_end = 17049
+    _globals["_PARSE"]._serialized_start = 17052
+    _globals["_PARSE"]._serialized_end = 17440
     _globals["_PARSE_OPTIONSENTRY"]._serialized_start = 5968
     _globals["_PARSE_OPTIONSENTRY"]._serialized_end = 6026
-    _globals["_PARSE_PARSEFORMAT"]._serialized_start = 17287
-    _globals["_PARSE_PARSEFORMAT"]._serialized_end = 17375
-    _globals["_ASOFJOIN"]._serialized_start = 17389
-    _globals["_ASOFJOIN"]._serialized_end = 17864
-    _globals["_LATERALJOIN"]._serialized_start = 17867
-    _globals["_LATERALJOIN"]._serialized_end = 18097
+    _globals["_PARSE_PARSEFORMAT"]._serialized_start = 17341
+    _globals["_PARSE_PARSEFORMAT"]._serialized_end = 17429
+    _globals["_ASOFJOIN"]._serialized_start = 17443
+    _globals["_ASOFJOIN"]._serialized_end = 17918
+    _globals["_LATERALJOIN"]._serialized_start = 17921
+    _globals["_LATERALJOIN"]._serialized_end = 18151
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index c6f20c158a6c..974ab9bc422d 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -1161,6 +1161,7 @@ class Read(google.protobuf.message.Message):
         OPTIONS_FIELD_NUMBER: builtins.int
         PATHS_FIELD_NUMBER: builtins.int
         PREDICATES_FIELD_NUMBER: builtins.int
+        SOURCE_NAME_FIELD_NUMBER: builtins.int
         format: builtins.str
         """(Optional) Supported formats include: parquet, orc, text, json, 
parquet, csv, avro.
 
@@ -1192,6 +1193,11 @@ class Read(google.protobuf.message.Message):
 
             This is only supported by the JDBC data source.
             """
+        source_name: builtins.str
+        """(Optional) A user-provided name for the streaming source.
+        This name is used in checkpoint metadata and enables stable checkpoint 
locations
+        for source evolution.
+        """
         def __init__(
             self,
             *,
@@ -1200,6 +1206,7 @@ class Read(google.protobuf.message.Message):
             options: collections.abc.Mapping[builtins.str, builtins.str] | 
None = ...,
             paths: collections.abc.Iterable[builtins.str] | None = ...,
             predicates: collections.abc.Iterable[builtins.str] | None = ...,
+            source_name: builtins.str | None = ...,
         ) -> None: ...
         def HasField(
             self,
@@ -1208,10 +1215,14 @@ class Read(google.protobuf.message.Message):
                 b"_format",
                 "_schema",
                 b"_schema",
+                "_source_name",
+                b"_source_name",
                 "format",
                 b"format",
                 "schema",
                 b"schema",
+                "source_name",
+                b"source_name",
             ],
         ) -> builtins.bool: ...
         def ClearField(
@@ -1221,6 +1232,8 @@ class Read(google.protobuf.message.Message):
                 b"_format",
                 "_schema",
                 b"_schema",
+                "_source_name",
+                b"_source_name",
                 "format",
                 b"format",
                 "options",
@@ -1231,6 +1244,8 @@ class Read(google.protobuf.message.Message):
                 b"predicates",
                 "schema",
                 b"schema",
+                "source_name",
+                b"source_name",
             ],
         ) -> None: ...
         @typing.overload
@@ -1241,6 +1256,10 @@ class Read(google.protobuf.message.Message):
         def WhichOneof(
             self, oneof_group: typing_extensions.Literal["_schema", b"_schema"]
         ) -> typing_extensions.Literal["schema"] | None: ...
+        @typing.overload
+        def WhichOneof(
+            self, oneof_group: typing_extensions.Literal["_source_name", 
b"_source_name"]
+        ) -> typing_extensions.Literal["source_name"] | None: ...
 
     NAMED_TABLE_FIELD_NUMBER: builtins.int
     DATA_SOURCE_FIELD_NUMBER: builtins.int
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py 
b/python/pyspark/sql/connect/streaming/readwriter.py
index b7ca7dd1cbc0..b2813db8a805 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -19,6 +19,7 @@ from pyspark.sql.connect.utils import check_dependencies
 check_dependencies(__name__)
 
 import json
+import re
 import sys
 import pickle
 from typing import cast, overload, Callable, Dict, List, Optional, 
TYPE_CHECKING, Union
@@ -50,6 +51,7 @@ class DataStreamReader(OptionUtils):
         self._schema = ""
         self._client = client
         self._options: Dict[str, str] = {}
+        self._source_name: Optional[str] = None
 
     def _df(self, plan: LogicalPlan) -> "DataFrame":
         from pyspark.sql.connect.dataframe import DataFrame
@@ -89,6 +91,28 @@ class DataStreamReader(OptionUtils):
 
     options.__doc__ = PySparkDataStreamReader.options.__doc__
 
+    def name(self, source_name: str) -> "DataStreamReader":
+        if not isinstance(source_name, str):
+            raise PySparkTypeError(
+                errorClass="NOT_STR",
+                messageParameters={
+                    "arg_name": "source_name",
+                    "arg_type": type(source_name).__name__,
+                },
+            )
+
+        # Validate that source_name contains only ASCII letters, digits, and 
underscores
+        if not re.match(r"^[a-zA-Z0-9_]+$", source_name):
+            raise PySparkValueError(
+                errorClass="INVALID_STREAMING_SOURCE_NAME",
+                messageParameters={"source_name": source_name},
+            )
+
+        self._source_name = source_name
+        return self
+
+    name.__doc__ = PySparkDataStreamReader.name.__doc__
+
     def load(
         self,
         path: Optional[str] = None,
@@ -113,6 +137,7 @@ class DataStreamReader(OptionUtils):
             options=self._options,
             paths=[path] if path else None,
             is_streaming=True,
+            source_name=self._source_name,
         )
 
         return self._df(plan)
diff --git a/python/pyspark/sql/tests/connect/test_connect_readwriter.py 
b/python/pyspark/sql/tests/connect/test_connect_readwriter.py
index d688ba80e915..fc27771fff74 100644
--- a/python/pyspark/sql/tests/connect/test_connect_readwriter.py
+++ b/python/pyspark/sql/tests/connect/test_connect_readwriter.py
@@ -18,7 +18,9 @@
 import os
 import shutil
 import tempfile
+import time
 
+from pyspark.errors import PySparkTypeError, PySparkValueError
 from pyspark.sql.types import (
     StructType,
     StructField,
@@ -343,6 +345,187 @@ class 
SparkConnectReadWriterTests(SparkConnectSQLTestCase):
                     self.spark.read.parquet(path).schema,
                 )
 
+    # DataStreamReader.name() tests - require source evolution configs
+    def test_stream_reader_name_valid_names(self):
+        """Test that various valid source name patterns work correctly."""
+        with self.connect_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            valid_names = [
+                "mySource",
+                "my_source",
+                "MySource123",
+                "_private",
+                "source_123_test",
+                "123source",
+            ]
+
+            for name in valid_names:
+                with tempfile.TemporaryDirectory(prefix=f"test_{name}_") as 
tmpdir:
+                    
self.connect.range(10).write.mode("overwrite").parquet(tmpdir)
+                    df = (
+                        self.connect.readStream.format("parquet")
+                        .schema("id LONG")
+                        .name(name)
+                        .load(tmpdir)
+                    )
+                    self.assertTrue(
+                        df.isStreaming, f"DataFrame should be streaming for 
name: {name}"
+                    )
+
+    def test_stream_reader_name_method_chaining(self):
+        """Test that name() returns the reader for method chaining."""
+        with self.connect_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            with tempfile.TemporaryDirectory(prefix="test_chaining_") as 
tmpdir:
+                self.connect.range(10).write.mode("overwrite").parquet(tmpdir)
+                df = (
+                    self.connect.readStream.format("parquet")
+                    .schema("id LONG")
+                    .name("my_source")
+                    .option("maxFilesPerTrigger", "1")
+                    .load(tmpdir)
+                )
+
+                self.assertTrue(df.isStreaming, "DataFrame should be 
streaming")
+
+    def test_stream_reader_name_before_format(self):
+        """Test that order doesn't matter - name can be set before format."""
+        with self.connect_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            with tempfile.TemporaryDirectory(prefix="test_before_format_") as 
tmpdir:
+                self.connect.range(10).write.mode("overwrite").parquet(tmpdir)
+                df = (
+                    self.connect.readStream.name("my_source")
+                    .format("parquet")
+                    .schema("id LONG")
+                    .load(tmpdir)
+                )
+
+                self.assertTrue(df.isStreaming, "DataFrame should be 
streaming")
+
+    def test_stream_reader_invalid_names(self):
+        """Test that various invalid source names are rejected."""
+        with self.connect_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            invalid_names = [
+                "",  # empty string
+                "  ",  # whitespace only
+                "my-source",  # hyphen
+                "my source",  # space
+                "my.source",  # dot
+                "my@source",  # special char
+                "my$source",  # dollar sign
+                "my#source",  # hash
+                "my!source",  # exclamation
+            ]
+
+            for invalid_name in invalid_names:
+                with self.subTest(name=invalid_name):
+                    with tempfile.TemporaryDirectory(prefix="test_invalid_") 
as tmpdir:
+                        
self.connect.range(10).write.mode("overwrite").parquet(tmpdir)
+                        with self.assertRaises(PySparkValueError) as context:
+                            
self.connect.readStream.format("parquet").schema("id LONG").name(
+                                invalid_name
+                            ).load(tmpdir)
+
+                        # The error message should contain information about 
invalid name
+                        self.assertIn("source", str(context.exception).lower())
+
+    def test_stream_reader_invalid_name_wrong_type(self):
+        """Test that None and non-string types are rejected."""
+        invalid_types = [None, 123, 45.67, [], {}]
+
+        for invalid_value in invalid_types:
+            with self.subTest(value=invalid_value):
+                with self.assertRaises(PySparkTypeError):
+                    
self.connect.readStream.format("rate").name(invalid_value).load()
+
+    def test_stream_reader_name_with_different_formats(self):
+        """Test that name() works with different streaming data sources."""
+        with self.connect_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            with tempfile.TemporaryDirectory(prefix="test_name_formats_") as 
tmpdir:
+                # Create test data
+                self.connect.range(10).write.mode("overwrite").parquet(tmpdir 
+ "/parquet_data")
+                self.connect.range(10).selectExpr("id", "CAST(id AS STRING) as 
value").write.mode(
+                    "overwrite"
+                ).json(tmpdir + "/json_data")
+
+                # Test with parquet
+                parquet_df = (
+                    self.connect.readStream.format("parquet")
+                    .name("parquet_source")
+                    .schema("id LONG")
+                    .load(tmpdir + "/parquet_data")
+                )
+                self.assertTrue(parquet_df.isStreaming, "Parquet DataFrame 
should be streaming")
+
+                # Test with json - specify schema
+                json_df = (
+                    self.connect.readStream.format("json")
+                    .name("json_source")
+                    .schema("id LONG, value STRING")
+                    .load(tmpdir + "/json_data")
+                )
+                self.assertTrue(json_df.isStreaming, "JSON DataFrame should be 
streaming")
+
+    def test_stream_reader_name_persists_through_query(self):
+        """Test that the name persists when starting a streaming query."""
+        with self.connect_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            with tempfile.TemporaryDirectory(prefix="test_name_query_") as 
tmpdir:
+                data_dir = tmpdir + "/data"
+                checkpoint_dir = tmpdir + "/checkpoint"
+
+                # Create test data
+                
self.connect.range(10).write.mode("overwrite").parquet(data_dir)
+
+                df = (
+                    self.connect.readStream.format("parquet")
+                    .schema("id LONG")
+                    .name("parquet_source_test")
+                    .load(data_dir)
+                )
+
+                query = (
+                    df.writeStream.format("noop")
+                    .option("checkpointLocation", checkpoint_dir)
+                    .start()
+                )
+
+                try:
+                    # Let it run briefly
+                    time.sleep(1)
+
+                    # Verify query is running
+                    self.assertTrue(query.isActive, "Query should be active")
+                finally:
+                    query.stop()
+
 
 if __name__ == "__main__":
     from pyspark.testing import main
diff --git a/python/pyspark/sql/tests/test_connect_compatibility.py 
b/python/pyspark/sql/tests/test_connect_compatibility.py
index e0645586deac..56b212387fe4 100644
--- a/python/pyspark/sql/tests/test_connect_compatibility.py
+++ b/python/pyspark/sql/tests/test_connect_compatibility.py
@@ -487,7 +487,7 @@ class ConnectCompatibilityTestsMixin:
         """Test Data Stream Reader compatibility between classic and 
connect."""
         expected_missing_connect_properties = set()
         expected_missing_classic_properties = set()
-        expected_missing_connect_methods = {"name"}
+        expected_missing_connect_methods = set()
         expected_missing_classic_methods = set()
         self.check_compatibility(
             ClassicDataStreamReader,
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 9098c1af74e5..242b2a0cee45 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -17,9 +17,10 @@
 package org.apache.spark.sql.streaming
 
 import scala.jdk.CollectionConverters._
+import scala.util.matching.Regex
 
 import org.apache.spark.annotation.Evolving
-import org.apache.spark.sql.{DataFrame, Dataset, Encoders}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Encoders}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -292,4 +293,24 @@ abstract class DataStreamReader {
 
   protected def validateXmlSchema(): Unit = ()
 
+  /**
+   * Validates that a streaming source name only contains alphanumeric 
characters and underscores.
+   *
+   * @param sourceName
+   *   the source name to validate
+   * @throws IllegalArgumentException
+   *   if the source name is null, empty, or contains invalid characters
+   */
+  private[sql] def validateSourceName(sourceName: String): Unit = {
+    require(sourceName != null, "Source name cannot be null")
+    require(sourceName.nonEmpty, "Source name cannot be empty")
+
+    val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r
+    if (!validNamePattern.pattern.matcher(sourceName).matches()) {
+      throw new AnalysisException(
+        errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
+        messageParameters = Map("sourceName" -> sourceName))
+    }
+  }
+
 }
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
index c148a65a16be..ac6a35a9db58 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter, Row}
 import org.apache.spark.sql.connect.SparkSession
 import org.apache.spark.sql.connect.test.{IntegrationTestUtils, QueryTest, 
RemoteSparkSession}
 import org.apache.spark.sql.functions.{col, lit, udf, window}
@@ -53,6 +53,19 @@ class ClientStreamingQuerySuite extends QueryTest with 
RemoteSparkSession with L
       "test-data",
       "streaming")
 
+  /**
+   * Helper method to run tests with source evolution configs enabled.
+   */
+  private def testWithSourceEvolution(testName: String)(testFun: => Unit): 
Unit = {
+    test(testName) {
+      withSQLConf(
+        "spark.sql.streaming.queryEvolution.enableSourceEvolution" -> "true",
+        "spark.sql.streaming.offsetLog.formatVersion" -> "2") {
+        testFun
+      }
+    }
+  }
+
   test("Streaming API with windowed aggregate query") {
     // This verifies standard streaming API by starting a streaming query with 
windowed count.
     withSQLConf(
@@ -753,6 +766,86 @@ class ClientStreamingQuerySuite extends QueryTest with 
RemoteSparkSession with L
       terminate = terminate :+ event.json
     }
   }
+
+  // Tests for DataStreamReader.name() method
+  testWithSourceEvolution("stream reader name() with valid source names") {
+    Seq("mySource", "my_source", "MySource123", "_private", "source_123_test", 
"123source")
+      .foreach { name =>
+        withTempPath { dir =>
+          val path = dir.getCanonicalPath
+          spark.range(10).write.parquet(path)
+
+          val df = spark.readStream
+            .format("parquet")
+            .schema("id LONG")
+            .name(name)
+            .load(path)
+
+          assert(df.isStreaming, s"DataFrame should be streaming for name: 
$name")
+        }
+      }
+  }
+
+  testWithSourceEvolution("stream reader name() method chaining") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.range(10).write.parquet(path)
+
+      val df = spark.readStream
+        .format("parquet")
+        .schema("id LONG")
+        .name("my_source")
+        .option("maxFilesPerTrigger", "1")
+        .load(path)
+
+      assert(df.isStreaming, "DataFrame should be streaming")
+    }
+  }
+
+  // Seq of (sourceName, expectedExceptionClass, expectedConditionOpt)
+  val invalidSourceNames = Seq(
+    (
+      "my-source",
+      classOf[AnalysisException],
+      Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
+    (
+      "my space",
+      classOf[AnalysisException],
+      Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
+    (
+      "my.source",
+      classOf[AnalysisException],
+      Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
+    ("", classOf[IllegalArgumentException], None) // empty string case
+  )
+
+  invalidSourceNames.foreach { case (sourceName, exceptionClass, conditionOpt) 
=>
+    test(s"stream reader invalid source name - '$sourceName'") {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+        spark.range(10).write.parquet(path)
+
+        val thrown = intercept[Exception] {
+          spark.readStream
+            .format("parquet")
+            .schema("id LONG")
+            .name(sourceName)
+            .load(path)
+        }
+
+        // Verify exception type
+        assert(exceptionClass.isInstance(thrown))
+
+        // Verify error condition only for AnalysisException cases
+        conditionOpt.foreach { condition =>
+          checkError(
+            exception = thrown.asInstanceOf[AnalysisException],
+            condition = condition,
+            parameters = Map("sourceName" -> sourceName))
+        }
+      }
+    }
+  }
 }
 
 class TestForeachWriter[T] extends ForeachWriter[T] {
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/sql/connect/common/src/main/protobuf/spark/connect/relations.proto
index 1583785e69fb..bf3eb3214e5d 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -256,6 +256,11 @@ message Read {
     //
     // This is only supported by the JDBC data source.
     repeated string predicates = 5;
+
+    // (Optional) A user-provided name for the streaming source.
+    // This name is used in checkpoint metadata and enables stable checkpoint 
locations
+    // for source evolution.
+    optional string source_name = 6;
   }
 }
 
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamReader.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamReader.scala
index 808df593b775..529abc0ec0d1 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamReader.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamReader.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connect
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.annotation.Evolving
+import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.connect.proto.Read.DataSource
 import org.apache.spark.sql.connect.ConnectConversions._
 import org.apache.spark.sql.errors.DataTypeErrors
@@ -75,6 +75,21 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession)
     this
   }
 
+  /**
+   * Specifies a name for the streaming source. This name is used to identify 
the source in
+   * checkpoint metadata and enables stable checkpoint locations for source 
evolution.
+   *
+   * @param sourceName
+   *   the name to assign to this streaming source
+   * @since 4.2.0
+   */
+  @Experimental
+  private[sql] def name(sourceName: String): this.type = {
+    validateSourceName(sourceName)
+    sourceBuilder.setSourceName(sourceName)
+    this
+  }
+
   /** @inheritdoc */
   def load(): DataFrame = {
     sparkSession.newDataFrame { relationBuilder =>
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index f55d391eab54..d47409d7e681 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1697,6 +1697,9 @@ class SparkConnectPlanner(
         if (streamSource.getSchema.nonEmpty) {
           reader.schema(parseSchema(streamSource.getSchema))
         }
+        if (streamSource.hasSourceName) {
+          reader.name(streamSource.getSourceName)
+        }
         val streamDF = streamSource.getPathsCount match {
           case 0 => reader.load()
           case 1 => reader.load(streamSource.getPaths(0))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
index f1441bd5c736..0469d77e2b15 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.classic
 
 import scala.jdk.CollectionConverters._
-import scala.util.matching.Regex
 
 import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.sql.catalyst.analysis.{NamedStreamingRelation, 
UnresolvedRelation}
@@ -32,29 +31,6 @@ import org.apache.spark.sql.streaming
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-/**
- * Companion object for DataStreamReader with validation utilities.
- */
-private[sql] object DataStreamReader {
-  /**
-   * Pattern for valid source and sink names.
-   * Names must only contain ASCII letters, digits, and underscores.
-   */
-  private val VALID_NAME_PATTERN: Regex = "^[a-zA-Z0-9_]+$".r
-
-  /**
-   * Validates that a streaming source name only contains alphanumeric 
characters and underscores.
-   *
-   * @param sourceName the source name to validate
-   * @throws AnalysisException if the source name contains invalid characters
-   */
-  def validateSourceName(sourceName: String): Unit = {
-    if (!VALID_NAME_PATTERN.pattern.matcher(sourceName).matches()) {
-      throw QueryCompilationErrors.invalidStreamingSourceNameError(sourceName)
-    }
-  }
-}
-
 /**
  * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
  * key-value stores, etc). Use `SparkSession.readStream` to access this.
@@ -100,7 +76,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession)
    */
   @Experimental
   private[sql] def name(sourceName: String): this.type = {
-    DataStreamReader.validateSourceName(sourceName)
+    validateSourceName(sourceName)
     this.userProvidedSourceName = Option(sourceName)
     this
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to