This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dd2c62f790a8 [MINOR][PYTHON][TESTS] Consolidate 
DataStreamReader.name() tests into test_streaming.py
dd2c62f790a8 is described below

commit dd2c62f790a821440baa75792a98c6ed5a37e519
Author: ericm-db <[email protected]>
AuthorDate: Thu Jan 29 12:47:58 2026 +0900

    [MINOR][PYTHON][TESTS] Consolidate DataStreamReader.name() tests into 
test_streaming.py
    
    ### What changes were proposed in this pull request?
    Move DataStreamReader.name() test cases from the standalone 
test_streaming_reader_name.py file into the existing test_streaming.py test 
suite to reduce test file proliferation.
    
    ### Why are the changes needed?
    Having a dedicated test file for a small number of related tests increases 
CI overhead and makes the test suite harder to navigate. These tests belong in 
the main streaming test suite.
    
    ### Does this PR introduce any user-facing change?
    No, test-only changes.
    
    ### How was this patch tested?
    
      - Existing tests moved to new location
      - All tests use sql_conf() context manager for proper config isolation
      - Tests can be run with: `python/run-tests --testnames 
'pyspark.sql.tests.streaming.test_streaming'`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53988 from ericm-db/consolidate-streaming-reader-name-tests.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 dev/sparktestsupport/modules.py                    |   1 -
 .../pyspark/sql/tests/streaming/test_streaming.py  | 145 ++++++++++++++++-
 .../tests/streaming/test_streaming_reader_name.py  | 179 ---------------------
 3 files changed, 144 insertions(+), 181 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 79ec8da96b93..44977d79d89c 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -670,7 +670,6 @@ pyspark_structured_streaming = Module(
         "pyspark.sql.tests.streaming.test_streaming_foreach_batch",
         "pyspark.sql.tests.streaming.test_streaming_listener",
         "pyspark.sql.tests.streaming.test_streaming_offline_state_repartition",
-        "pyspark.sql.tests.streaming.test_streaming_reader_name",
         "pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state",
         "pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state",
         
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state_checkpoint_v2",
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py 
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 0b5884efb717..ba39ca513610 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -24,7 +24,7 @@ from pyspark.sql import Row
 from pyspark.sql.functions import lit
 from pyspark.sql.types import StructType, StructField, IntegerType, 
StringType, TimestampType
 from pyspark.testing.sqlutils import ReusedSQLTestCase
-from pyspark.errors import PySparkValueError
+from pyspark.errors import PySparkTypeError, PySparkValueError
 
 
 class StreamingTestsMixin:
@@ -500,6 +500,149 @@ class StreamingTestsMixin:
                 set([Row(value="view_a"), Row(value="view_b"), 
Row(value="view_c")]), set(result)
             )
 
+    def test_name_with_valid_names(self):
+        """Test that various valid source name patterns work correctly."""
+        with self.sql_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            valid_names = [
+                "mySource",
+                "my_source",
+                "MySource123",
+                "_private",
+                "source_123_test",
+                "123source",
+            ]
+
+            for name in valid_names:
+                with tempfile.TemporaryDirectory(prefix=f"test_{name}_") as 
tmpdir:
+                    
self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
+                    df = (
+                        self.spark.readStream.format("parquet")
+                        .schema("id LONG")
+                        .name(name)
+                        .load(tmpdir)
+                    )
+                    self.assertTrue(
+                        df.isStreaming, f"DataFrame should be streaming for 
name: {name}"
+                    )
+
+    def test_name_method_chaining(self):
+        """Test that name() returns the reader for method chaining."""
+        with self.sql_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            with tempfile.TemporaryDirectory(prefix="test_chaining_") as 
tmpdir:
+                self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
+                df = (
+                    self.spark.readStream.format("parquet")
+                    .schema("id LONG")
+                    .name("my_source")
+                    .option("maxFilesPerTrigger", "1")
+                    .load(tmpdir)
+                )
+
+                self.assertTrue(df.isStreaming, "DataFrame should be 
streaming")
+
+    def test_name_before_format(self):
+        """Test that order doesn't matter - name can be set before format."""
+        with self.sql_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            with tempfile.TemporaryDirectory(prefix="test_before_format_") as 
tmpdir:
+                self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
+                df = (
+                    self.spark.readStream.name("my_source")
+                    .format("parquet")
+                    .schema("id LONG")
+                    .load(tmpdir)
+                )
+
+                self.assertTrue(df.isStreaming, "DataFrame should be 
streaming")
+
+    def test_invalid_names(self):
+        """Test that various invalid source names are rejected."""
+        with self.sql_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            invalid_names = [
+                "",  # empty string
+                "  ",  # whitespace only
+                "my-source",  # hyphen
+                "my source",  # space
+                "my.source",  # dot
+                "my@source",  # special char
+                "my$source",  # dollar sign
+                "my#source",  # hash
+                "my!source",  # exclamation
+            ]
+
+            for invalid_name in invalid_names:
+                with self.subTest(name=invalid_name):
+                    with tempfile.TemporaryDirectory(prefix="test_invalid_") 
as tmpdir:
+                        
self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
+                        with self.assertRaises(PySparkValueError) as context:
+                            self.spark.readStream.format("parquet").schema("id 
LONG").name(
+                                invalid_name
+                            ).load(tmpdir)
+
+                        # The error message should contain information about 
invalid name
+                        self.assertIn("source", str(context.exception).lower())
+
+    def test_invalid_name_wrong_type(self):
+        """Test that None and non-string types are rejected."""
+        invalid_types = [None, 123, 45.67, [], {}]
+
+        for invalid_value in invalid_types:
+            with self.subTest(value=invalid_value):
+                with self.assertRaises(PySparkTypeError):
+                    
self.spark.readStream.format("rate").name(invalid_value).load()
+
+    def test_name_with_different_formats(self):
+        """Test that name() works with different streaming data sources."""
+        with self.sql_conf(
+            {
+                "spark.sql.streaming.queryEvolution.enableSourceEvolution": 
"true",
+                "spark.sql.streaming.offsetLog.formatVersion": "2",
+            }
+        ):
+            with tempfile.TemporaryDirectory(prefix="test_name_formats_") as 
tmpdir:
+                # Create test data
+                self.spark.range(10).write.mode("overwrite").parquet(tmpdir + 
"/parquet_data")
+                self.spark.range(10).selectExpr("id", "CAST(id AS STRING) as 
value").write.mode(
+                    "overwrite"
+                ).json(tmpdir + "/json_data")
+
+                # Test with parquet
+                parquet_df = (
+                    self.spark.readStream.format("parquet")
+                    .name("parquet_source")
+                    .schema("id LONG")
+                    .load(tmpdir + "/parquet_data")
+                )
+                self.assertTrue(parquet_df.isStreaming, "Parquet DataFrame 
should be streaming")
+
+                # Test with json - specify schema
+                json_df = (
+                    self.spark.readStream.format("json")
+                    .name("json_source")
+                    .schema("id LONG, value STRING")
+                    .load(tmpdir + "/json_data")
+                )
+                self.assertTrue(json_df.isStreaming, "JSON DataFrame should be 
streaming")
+
     def test_streaming_drop_duplicate_within_watermark(self):
         """
         This verifies dropDuplicatesWithinWatermark works with a streaming 
dataframe.
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_reader_name.py 
b/python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
deleted file mode 100644
index 8b3af847df7e..000000000000
--- a/python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
+++ /dev/null
@@ -1,179 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import tempfile
-import time
-
-from pyspark.errors import PySparkTypeError, PySparkValueError
-from pyspark.testing.sqlutils import ReusedSQLTestCase
-
-
-class DataStreamReaderNameTests(ReusedSQLTestCase):
-    """Test suite for DataStreamReader.name() functionality in PySpark."""
-
-    @classmethod
-    def setUpClass(cls):
-        super(DataStreamReaderNameTests, cls).setUpClass()
-        # Enable streaming source evolution feature
-        
cls.spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", 
"true")
-        cls.spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")
-
-    def test_name_with_valid_names(self):
-        """Test that various valid source name patterns work correctly."""
-        valid_names = [
-            "mySource",
-            "my_source",
-            "MySource123",
-            "_private",
-            "source_123_test",
-            "123source",
-        ]
-
-        for name in valid_names:
-            with tempfile.TemporaryDirectory(prefix=f"test_{name}_") as tmpdir:
-                self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
-                df = (
-                    self.spark.readStream.format("parquet")
-                    .schema("id LONG")
-                    .name(name)
-                    .load(tmpdir)
-                )
-                self.assertTrue(df.isStreaming, f"DataFrame should be 
streaming for name: {name}")
-
-    def test_name_method_chaining(self):
-        """Test that name() returns the reader for method chaining."""
-        with tempfile.TemporaryDirectory(prefix="test_chaining_") as tmpdir:
-            self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
-            df = (
-                self.spark.readStream.format("parquet")
-                .schema("id LONG")
-                .name("my_source")
-                .option("maxFilesPerTrigger", "1")
-                .load(tmpdir)
-            )
-
-            self.assertTrue(df.isStreaming, "DataFrame should be streaming")
-
-    def test_name_before_format(self):
-        """Test that order doesn't matter - name can be set before format."""
-        with tempfile.TemporaryDirectory(prefix="test_before_format_") as 
tmpdir:
-            self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
-            df = (
-                self.spark.readStream.name("my_source")
-                .format("parquet")
-                .schema("id LONG")
-                .load(tmpdir)
-            )
-
-            self.assertTrue(df.isStreaming, "DataFrame should be streaming")
-
-    def test_invalid_names(self):
-        """Test that various invalid source names are rejected."""
-        invalid_names = [
-            "",  # empty string
-            "  ",  # whitespace only
-            "my-source",  # hyphen
-            "my source",  # space
-            "my.source",  # dot
-            "my@source",  # special char
-            "my$source",  # dollar sign
-            "my#source",  # hash
-            "my!source",  # exclamation
-        ]
-
-        for invalid_name in invalid_names:
-            with self.subTest(name=invalid_name):
-                with tempfile.TemporaryDirectory(prefix="test_invalid_") as 
tmpdir:
-                    
self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
-                    with self.assertRaises(PySparkValueError) as context:
-                        self.spark.readStream.format("parquet").schema("id 
LONG").name(
-                            invalid_name
-                        ).load(tmpdir)
-
-                    # The error message should contain information about 
invalid name
-                    self.assertIn("source", str(context.exception).lower())
-
-    def test_invalid_name_wrong_type(self):
-        """Test that None and non-string types are rejected."""
-        invalid_types = [None, 123, 45.67, [], {}]
-
-        for invalid_value in invalid_types:
-            with self.subTest(value=invalid_value):
-                with self.assertRaises(PySparkTypeError):
-                    
self.spark.readStream.format("rate").name(invalid_value).load()
-
-    def test_name_with_different_formats(self):
-        """Test that name() works with different streaming data sources."""
-        with tempfile.TemporaryDirectory(prefix="test_name_formats_") as 
tmpdir:
-            # Create test data
-            self.spark.range(10).write.mode("overwrite").parquet(tmpdir + 
"/parquet_data")
-            self.spark.range(10).selectExpr("id", "CAST(id AS STRING) as 
value").write.mode(
-                "overwrite"
-            ).json(tmpdir + "/json_data")
-
-            # Test with parquet
-            parquet_df = (
-                self.spark.readStream.format("parquet")
-                .name("parquet_source")
-                .schema("id LONG")
-                .load(tmpdir + "/parquet_data")
-            )
-            self.assertTrue(parquet_df.isStreaming, "Parquet DataFrame should 
be streaming")
-
-            # Test with json - specify schema
-            json_df = (
-                self.spark.readStream.format("json")
-                .name("json_source")
-                .schema("id LONG, value STRING")
-                .load(tmpdir + "/json_data")
-            )
-            self.assertTrue(json_df.isStreaming, "JSON DataFrame should be 
streaming")
-
-    def test_name_persists_through_query(self):
-        """Test that the name persists when starting a streaming query."""
-        with tempfile.TemporaryDirectory(prefix="test_name_query_") as tmpdir:
-            data_dir = tmpdir + "/data"
-            checkpoint_dir = tmpdir + "/checkpoint"
-
-            # Create test data
-            self.spark.range(10).write.mode("overwrite").parquet(data_dir)
-
-            df = (
-                self.spark.readStream.format("parquet")
-                .schema("id LONG")
-                .name("parquet_source_test")
-                .load(data_dir)
-            )
-
-            query = (
-                df.writeStream.format("noop").option("checkpointLocation", 
checkpoint_dir).start()
-            )
-
-            try:
-                # Let it run briefly
-                time.sleep(1)
-
-                # Verify query is running
-                self.assertTrue(query.isActive, "Query should be active")
-            finally:
-                query.stop()
-
-
-if __name__ == "__main__":
-    from pyspark.testing import main
-
-    main()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to