This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dd2c62f790a8 [MINOR][PYTHON][TESTS] Consolidate
DataStreamReader.name() tests into test_streaming.py
dd2c62f790a8 is described below
commit dd2c62f790a821440baa75792a98c6ed5a37e519
Author: ericm-db <[email protected]>
AuthorDate: Thu Jan 29 12:47:58 2026 +0900
[MINOR][PYTHON][TESTS] Consolidate DataStreamReader.name() tests into
test_streaming.py
### What changes were proposed in this pull request?
Move DataStreamReader.name() test cases from the standalone
test_streaming_reader_name.py file into the existing test_streaming.py test
suite to reduce test file proliferation.
### Why are the changes needed?
Having a dedicated test file for a small number of related tests increases
CI overhead and makes the test suite harder to navigate. These tests belong in
the main streaming test suite.
### Does this PR introduce any user-facing change?
No, test-only changes.
### How was this patch tested?
- Existing tests moved to new location
- All tests use sql_conf() context manager for proper config isolation
- Tests can be run with: `python/run-tests --testnames
'pyspark.sql.tests.streaming.test_streaming'`
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53988 from ericm-db/consolidate-streaming-reader-name-tests.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
dev/sparktestsupport/modules.py | 1 -
.../pyspark/sql/tests/streaming/test_streaming.py | 145 ++++++++++++++++-
.../tests/streaming/test_streaming_reader_name.py | 179 ---------------------
3 files changed, 144 insertions(+), 181 deletions(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 79ec8da96b93..44977d79d89c 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -670,7 +670,6 @@ pyspark_structured_streaming = Module(
"pyspark.sql.tests.streaming.test_streaming_foreach_batch",
"pyspark.sql.tests.streaming.test_streaming_listener",
"pyspark.sql.tests.streaming.test_streaming_offline_state_repartition",
- "pyspark.sql.tests.streaming.test_streaming_reader_name",
"pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state",
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state",
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state_checkpoint_v2",
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 0b5884efb717..ba39ca513610 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -24,7 +24,7 @@ from pyspark.sql import Row
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType,
StringType, TimestampType
from pyspark.testing.sqlutils import ReusedSQLTestCase
-from pyspark.errors import PySparkValueError
+from pyspark.errors import PySparkTypeError, PySparkValueError
class StreamingTestsMixin:
@@ -500,6 +500,149 @@ class StreamingTestsMixin:
set([Row(value="view_a"), Row(value="view_b"),
Row(value="view_c")]), set(result)
)
+ def test_name_with_valid_names(self):
+ """Test that various valid source name patterns work correctly."""
+ with self.sql_conf(
+ {
+ "spark.sql.streaming.queryEvolution.enableSourceEvolution":
"true",
+ "spark.sql.streaming.offsetLog.formatVersion": "2",
+ }
+ ):
+ valid_names = [
+ "mySource",
+ "my_source",
+ "MySource123",
+ "_private",
+ "source_123_test",
+ "123source",
+ ]
+
+ for name in valid_names:
+ with tempfile.TemporaryDirectory(prefix=f"test_{name}_") as
tmpdir:
+
self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
+ df = (
+ self.spark.readStream.format("parquet")
+ .schema("id LONG")
+ .name(name)
+ .load(tmpdir)
+ )
+ self.assertTrue(
+ df.isStreaming, f"DataFrame should be streaming for
name: {name}"
+ )
+
+ def test_name_method_chaining(self):
+ """Test that name() returns the reader for method chaining."""
+ with self.sql_conf(
+ {
+ "spark.sql.streaming.queryEvolution.enableSourceEvolution":
"true",
+ "spark.sql.streaming.offsetLog.formatVersion": "2",
+ }
+ ):
+ with tempfile.TemporaryDirectory(prefix="test_chaining_") as
tmpdir:
+ self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
+ df = (
+ self.spark.readStream.format("parquet")
+ .schema("id LONG")
+ .name("my_source")
+ .option("maxFilesPerTrigger", "1")
+ .load(tmpdir)
+ )
+
+ self.assertTrue(df.isStreaming, "DataFrame should be
streaming")
+
+ def test_name_before_format(self):
+ """Test that order doesn't matter - name can be set before format."""
+ with self.sql_conf(
+ {
+ "spark.sql.streaming.queryEvolution.enableSourceEvolution":
"true",
+ "spark.sql.streaming.offsetLog.formatVersion": "2",
+ }
+ ):
+ with tempfile.TemporaryDirectory(prefix="test_before_format_") as
tmpdir:
+ self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
+ df = (
+ self.spark.readStream.name("my_source")
+ .format("parquet")
+ .schema("id LONG")
+ .load(tmpdir)
+ )
+
+ self.assertTrue(df.isStreaming, "DataFrame should be
streaming")
+
+ def test_invalid_names(self):
+ """Test that various invalid source names are rejected."""
+ with self.sql_conf(
+ {
+ "spark.sql.streaming.queryEvolution.enableSourceEvolution":
"true",
+ "spark.sql.streaming.offsetLog.formatVersion": "2",
+ }
+ ):
+ invalid_names = [
+ "", # empty string
+ " ", # whitespace only
+ "my-source", # hyphen
+ "my source", # space
+ "my.source", # dot
+ "my@source", # special char
+ "my$source", # dollar sign
+ "my#source", # hash
+ "my!source", # exclamation
+ ]
+
+ for invalid_name in invalid_names:
+ with self.subTest(name=invalid_name):
+ with tempfile.TemporaryDirectory(prefix="test_invalid_")
as tmpdir:
+
self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
+ with self.assertRaises(PySparkValueError) as context:
+ self.spark.readStream.format("parquet").schema("id
LONG").name(
+ invalid_name
+ ).load(tmpdir)
+
+ # The error message should contain information about
invalid name
+ self.assertIn("source", str(context.exception).lower())
+
+ def test_invalid_name_wrong_type(self):
+ """Test that None and non-string types are rejected."""
+ invalid_types = [None, 123, 45.67, [], {}]
+
+ for invalid_value in invalid_types:
+ with self.subTest(value=invalid_value):
+ with self.assertRaises(PySparkTypeError):
+
self.spark.readStream.format("rate").name(invalid_value).load()
+
+ def test_name_with_different_formats(self):
+ """Test that name() works with different streaming data sources."""
+ with self.sql_conf(
+ {
+ "spark.sql.streaming.queryEvolution.enableSourceEvolution":
"true",
+ "spark.sql.streaming.offsetLog.formatVersion": "2",
+ }
+ ):
+ with tempfile.TemporaryDirectory(prefix="test_name_formats_") as
tmpdir:
+ # Create test data
+ self.spark.range(10).write.mode("overwrite").parquet(tmpdir +
"/parquet_data")
+ self.spark.range(10).selectExpr("id", "CAST(id AS STRING) as
value").write.mode(
+ "overwrite"
+ ).json(tmpdir + "/json_data")
+
+ # Test with parquet
+ parquet_df = (
+ self.spark.readStream.format("parquet")
+ .name("parquet_source")
+ .schema("id LONG")
+ .load(tmpdir + "/parquet_data")
+ )
+ self.assertTrue(parquet_df.isStreaming, "Parquet DataFrame
should be streaming")
+
+ # Test with json - specify schema
+ json_df = (
+ self.spark.readStream.format("json")
+ .name("json_source")
+ .schema("id LONG, value STRING")
+ .load(tmpdir + "/json_data")
+ )
+ self.assertTrue(json_df.isStreaming, "JSON DataFrame should be
streaming")
+
def test_streaming_drop_duplicate_within_watermark(self):
"""
This verifies dropDuplicatesWithinWatermark works with a streaming
dataframe.
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
b/python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
deleted file mode 100644
index 8b3af847df7e..000000000000
--- a/python/pyspark/sql/tests/streaming/test_streaming_reader_name.py
+++ /dev/null
@@ -1,179 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import tempfile
-import time
-
-from pyspark.errors import PySparkTypeError, PySparkValueError
-from pyspark.testing.sqlutils import ReusedSQLTestCase
-
-
-class DataStreamReaderNameTests(ReusedSQLTestCase):
- """Test suite for DataStreamReader.name() functionality in PySpark."""
-
- @classmethod
- def setUpClass(cls):
- super(DataStreamReaderNameTests, cls).setUpClass()
- # Enable streaming source evolution feature
-
cls.spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution",
"true")
- cls.spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")
-
- def test_name_with_valid_names(self):
- """Test that various valid source name patterns work correctly."""
- valid_names = [
- "mySource",
- "my_source",
- "MySource123",
- "_private",
- "source_123_test",
- "123source",
- ]
-
- for name in valid_names:
- with tempfile.TemporaryDirectory(prefix=f"test_{name}_") as tmpdir:
- self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
- df = (
- self.spark.readStream.format("parquet")
- .schema("id LONG")
- .name(name)
- .load(tmpdir)
- )
- self.assertTrue(df.isStreaming, f"DataFrame should be
streaming for name: {name}")
-
- def test_name_method_chaining(self):
- """Test that name() returns the reader for method chaining."""
- with tempfile.TemporaryDirectory(prefix="test_chaining_") as tmpdir:
- self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
- df = (
- self.spark.readStream.format("parquet")
- .schema("id LONG")
- .name("my_source")
- .option("maxFilesPerTrigger", "1")
- .load(tmpdir)
- )
-
- self.assertTrue(df.isStreaming, "DataFrame should be streaming")
-
- def test_name_before_format(self):
- """Test that order doesn't matter - name can be set before format."""
- with tempfile.TemporaryDirectory(prefix="test_before_format_") as
tmpdir:
- self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
- df = (
- self.spark.readStream.name("my_source")
- .format("parquet")
- .schema("id LONG")
- .load(tmpdir)
- )
-
- self.assertTrue(df.isStreaming, "DataFrame should be streaming")
-
- def test_invalid_names(self):
- """Test that various invalid source names are rejected."""
- invalid_names = [
- "", # empty string
- " ", # whitespace only
- "my-source", # hyphen
- "my source", # space
- "my.source", # dot
- "my@source", # special char
- "my$source", # dollar sign
- "my#source", # hash
- "my!source", # exclamation
- ]
-
- for invalid_name in invalid_names:
- with self.subTest(name=invalid_name):
- with tempfile.TemporaryDirectory(prefix="test_invalid_") as
tmpdir:
-
self.spark.range(10).write.mode("overwrite").parquet(tmpdir)
- with self.assertRaises(PySparkValueError) as context:
- self.spark.readStream.format("parquet").schema("id
LONG").name(
- invalid_name
- ).load(tmpdir)
-
- # The error message should contain information about
invalid name
- self.assertIn("source", str(context.exception).lower())
-
- def test_invalid_name_wrong_type(self):
- """Test that None and non-string types are rejected."""
- invalid_types = [None, 123, 45.67, [], {}]
-
- for invalid_value in invalid_types:
- with self.subTest(value=invalid_value):
- with self.assertRaises(PySparkTypeError):
-
self.spark.readStream.format("rate").name(invalid_value).load()
-
- def test_name_with_different_formats(self):
- """Test that name() works with different streaming data sources."""
- with tempfile.TemporaryDirectory(prefix="test_name_formats_") as
tmpdir:
- # Create test data
- self.spark.range(10).write.mode("overwrite").parquet(tmpdir +
"/parquet_data")
- self.spark.range(10).selectExpr("id", "CAST(id AS STRING) as
value").write.mode(
- "overwrite"
- ).json(tmpdir + "/json_data")
-
- # Test with parquet
- parquet_df = (
- self.spark.readStream.format("parquet")
- .name("parquet_source")
- .schema("id LONG")
- .load(tmpdir + "/parquet_data")
- )
- self.assertTrue(parquet_df.isStreaming, "Parquet DataFrame should
be streaming")
-
- # Test with json - specify schema
- json_df = (
- self.spark.readStream.format("json")
- .name("json_source")
- .schema("id LONG, value STRING")
- .load(tmpdir + "/json_data")
- )
- self.assertTrue(json_df.isStreaming, "JSON DataFrame should be
streaming")
-
- def test_name_persists_through_query(self):
- """Test that the name persists when starting a streaming query."""
- with tempfile.TemporaryDirectory(prefix="test_name_query_") as tmpdir:
- data_dir = tmpdir + "/data"
- checkpoint_dir = tmpdir + "/checkpoint"
-
- # Create test data
- self.spark.range(10).write.mode("overwrite").parquet(data_dir)
-
- df = (
- self.spark.readStream.format("parquet")
- .schema("id LONG")
- .name("parquet_source_test")
- .load(data_dir)
- )
-
- query = (
- df.writeStream.format("noop").option("checkpointLocation",
checkpoint_dir).start()
- )
-
- try:
- # Let it run briefly
- time.sleep(1)
-
- # Verify query is running
- self.assertTrue(query.isActive, "Query should be active")
- finally:
- query.stop()
-
-
-if __name__ == "__main__":
- from pyspark.testing import main
-
- main()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]