This is an automated email from the ASF dual-hosted git repository.
HyukjinKwon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f1cdb6d9a6dd [SPARK-56854][PYTHON] Filter None values in
DataFrame[Stream]Reader/Writer .option(s)
f1cdb6d9a6dd is described below
commit f1cdb6d9a6dd854305ff5eed4678d9ac8e39fc02
Author: Lavan Vivekanandasarma <[email protected]>
AuthorDate: Fri May 15 06:50:22 2026 +0900
[SPARK-56854][PYTHON] Filter None values in DataFrame[Stream]Reader/Writer
.option(s)
### What changes were proposed in this pull request?
Filter None values in Classic PySpark's DataFrameReader, DataFrameWriter,
DataFrameWriterV2, DataStreamReader, and DataStreamWriter `.option(key, value)`
and `.options(**kwargs)` methods. After this change, `option(key, None)` is a
no-op and `options(**{key: None, ...})` drops the None entries before
forwarding to the JVM. The loop-style methods mirror the shape of
`OptionUtils._set_opts` at `python/pyspark/sql/readwriter.py:41-53`: `for k, v
in options.items(): if v is not None: ...`.
### Why are the changes needed?
Classic and Spark Connect Python currently disagree on what `option(key,
None)` means. Classic forwards Python None to the JVM as Java null, which
several data sources interpret differently from "unset". For example, with
`spark.read.options(nullValue=None).schema("a STRING, b STRING").csv(path)` and
a row `"",val`, Classic produces `[Row(a='', b='val')]` while Connect produces
`[Row(a=None, b='val')]` because Connect drops the None, the default
`nullValue` of `""` stays in effect, an [...]
### Does this PR introduce _any_ user-facing change?
Yes. `option(k, None)` and `options(**{k: None})` were previously forwarded
to the JVM as null; they are now no-ops. A migration-guide entry under
"Upgrading from PySpark 4.1 to 4.2" documents the change. To set an option to
its default, omit it or pass None; to set it to an empty string, pass `""`
explicitly.
### How was this patch tested?
New parity test `test_option_none_is_filtered` in `ReadwriterTestsMixin`
pins the CSV `nullValue=None` case to `[Row(a=None, b="val")]` for both
`.option` and `.options`. Because `ReadwriterParityTests` inherits the mixin,
the regression test runs on Classic and on Spark Connect, giving cross-backend
coverage automatically.
Additional defensive smoke tests guard the writer / V2 writer / streaming
reader / streaming writer API contracts:
- `test_writer_option_none_chains_safely`
- `test_v2_writer_option_none_chains_safely`
- `test_stream_reader_option_none_chains_safely`
- `test_stream_writer_option_none_chains_safely`
### Was this patch authored or co-authored using generative AI tooling?
Partially Generated-by: Claude Opus 4.7
Closes #55867 from lavanv11/pyspark_reader_inconsistencies.
Authored-by: Lavan Vivekanandasarma <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../source/migration_guide/pyspark_upgrade.rst | 1 +
python/pyspark/sql/connect/readwriter.py | 8 +++++--
python/pyspark/sql/connect/streaming/readwriter.py | 4 ++++
python/pyspark/sql/readwriter.py | 18 ++++++++++-----
python/pyspark/sql/streaming/readwriter.py | 14 ++++++++----
.../pyspark/sql/tests/streaming/test_streaming.py | 20 +++++++++++++++++
python/pyspark/sql/tests/test_readwriter.py | 26 ++++++++++++++++++++++
7 files changed, 80 insertions(+), 11 deletions(-)
diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index dba2a6266939..041640b2eb7f 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -23,6 +23,7 @@ Upgrading from PySpark 4.1 to 4.2
---------------------------------
* In Spark 4.2, the minimum supported version for PyArrow has been raised from
15.0.0 to 18.0.0 in PySpark.
* In Spark 4.2, ``DataFrame.__getattr__`` on Spark Connect Python Client no
longer eagerly validate the column name. To restore the legacy behavior, set
``PYSPARK_VALIDATE_COLUMN_NAME_LEGACY`` environment variable to ``1``.
+* In Spark 4.2, ``DataFrame[Stream]Reader/Writer.option`` and ``.options`` now
filter out ``None`` values (treating them as "unset") instead of forwarding
``None`` to the JVM as Java ``null``, matching the Spark Connect Python client
(SPARK-49263) and ``OptionUtils._set_opts``. To set an option to its default,
omit it or pass ``None``; to set it to an empty string, pass ``""`` explicitly.
* In Spark 4.2, columnar data exchange between PySpark and the JVM uses Apache
Arrow by default. The configuration
``spark.sql.execution.arrow.pyspark.enabled`` now defaults to true. To restore
the legacy (non-Arrow) row-based data exchange, set
``spark.sql.execution.arrow.pyspark.enabled`` to ``false``.
* In Spark 4.2, regular Python UDFs are Arrow-optimized by default. The
configuration ``spark.sql.execution.pythonUDF.arrow.enabled`` now defaults to
true. To restore the legacy behavior for Python UDF execution, set
``spark.sql.execution.pythonUDF.arrow.enabled`` to ``false``.
* In Spark 4.2, regular Python UDTFs are Arrow-optimized by default. The
configuration ``spark.sql.execution.pythonUDTF.arrow.enabled`` now defaults to
true. To restore the legacy behavior for Python UDTF execution, set
``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``false``.
diff --git a/python/pyspark/sql/connect/readwriter.py
b/python/pyspark/sql/connect/readwriter.py
index b4a5a06993e5..5c2c0c80ccdf 100644
--- a/python/pyspark/sql/connect/readwriter.py
+++ b/python/pyspark/sql/connect/readwriter.py
@@ -602,6 +602,8 @@ class DataFrameWriter(OptionUtils):
format.__doc__ = PySparkDataFrameWriter.format.__doc__
def option(self, key: str, value: "OptionalPrimitiveType") ->
"DataFrameWriter":
+ if value is None:
+ return self
self._write.options[key] = to_str(value)
return self
@@ -609,7 +611,7 @@ class DataFrameWriter(OptionUtils):
def options(self, **options: "OptionalPrimitiveType") -> "DataFrameWriter":
for k in options:
- self._write.options[k] = to_str(options[k])
+ self.option(k, options[k])
return self
options.__doc__ = PySparkDataFrameWriter.options.__doc__
@@ -978,6 +980,8 @@ class DataFrameWriterV2(OptionUtils):
using.__doc__ = PySparkDataFrameWriterV2.using.__doc__
def option(self, key: str, value: "OptionalPrimitiveType") ->
"DataFrameWriterV2":
+ if value is None:
+ return self
self._write.options[key] = to_str(value)
return self
@@ -985,7 +989,7 @@ class DataFrameWriterV2(OptionUtils):
def options(self, **options: "OptionalPrimitiveType") ->
"DataFrameWriterV2":
for k in options:
- self._write.options[k] = to_str(options[k])
+ self.option(k, options[k])
return self
options.__doc__ = PySparkDataFrameWriterV2.options.__doc__
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py
b/python/pyspark/sql/connect/streaming/readwriter.py
index f4e12144bf40..969fdc322a0b 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -90,6 +90,8 @@ class DataStreamReader(OptionUtils):
schema.__doc__ = PySparkDataStreamReader.schema.__doc__
def option(self, key: str, value: "OptionalPrimitiveType") ->
"DataStreamReader":
+ if value is None:
+ return self
self._options[key] = str(value)
return self
@@ -488,6 +490,8 @@ class DataStreamWriter:
format.__doc__ = PySparkDataStreamWriter.format.__doc__
def option(self, key: str, value: "OptionalPrimitiveType") ->
"DataStreamWriter":
+ if value is None:
+ return self
self._write_proto.options[key] = cast(str, to_str(value))
return self
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 3e91a25771f4..e289faf89997 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -201,6 +201,8 @@ class DataFrameReader(OptionUtils):
|100|NULL|
+---+----+
"""
+ if value is None:
+ return self
self._jreader = self._jreader.option(key, to_str(value))
return self
@@ -248,8 +250,9 @@ class DataFrameReader(OptionUtils):
|100|NULL|
+---+----+
"""
- for k in options:
- self._jreader = self._jreader.option(k, to_str(options[k]))
+ for k, v in options.items():
+ if v is not None:
+ self._jreader = self._jreader.option(k, to_str(v))
return self
def load(
@@ -1433,6 +1436,8 @@ class DataFrameWriter(OptionUtils):
+---+------------+
"""
+ if value is None:
+ return self
self._jwrite = self._jwrite.option(key, to_str(value))
return self
@@ -1483,8 +1488,9 @@ class DataFrameWriter(OptionUtils):
|100|Hyukjin Kwon|
+---+------------+
"""
- for k in options:
- self._jwrite = self._jwrite.option(k, to_str(options[k]))
+ for k, v in options.items():
+ if v is not None:
+ self._jwrite = self._jwrite.option(k, to_str(v))
return self
@overload
@@ -2469,6 +2475,8 @@ class DataFrameWriterV2:
.. versionadded: 3.1.0
"""
+ if value is None:
+ return self
self._jwriter.option(key, to_str(value))
return self
@@ -2478,7 +2486,7 @@ class DataFrameWriterV2:
.. versionadded: 3.1.0
"""
- options = {k: to_str(v) for k, v in options.items()}
+ options = {k: to_str(v) for k, v in options.items() if v is not None}
self._jwriter.options(options)
return self
diff --git a/python/pyspark/sql/streaming/readwriter.py
b/python/pyspark/sql/streaming/readwriter.py
index 585dc05424c9..5a99f8fce297 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -207,6 +207,8 @@ class DataStreamReader(OptionUtils):
>>> time.sleep(3)
>>> q.stop()
"""
+ if value is None:
+ return self
self._jreader = self._jreader.option(key, to_str(value))
return self
@@ -242,8 +244,9 @@ class DataStreamReader(OptionUtils):
>>> time.sleep(3)
>>> q.stop()
"""
- for k in options:
- self._jreader = self._jreader.option(k, to_str(options[k]))
+ for k, v in options.items():
+ if v is not None:
+ self._jreader = self._jreader.option(k, to_str(v))
return self
def name(self, source_name: str) -> "DataStreamReader":
@@ -1143,6 +1146,8 @@ class DataStreamWriter:
>>> time.sleep(3)
>>> q.stop()
"""
+ if value is None:
+ return self
self._jwrite = self._jwrite.option(key, to_str(value))
return self
@@ -1179,8 +1184,9 @@ class DataStreamWriter:
>>> time.sleep(3)
>>> q.stop()
"""
- for k in options:
- self._jwrite = self._jwrite.option(k, to_str(options[k]))
+ for k, v in options.items():
+ if v is not None:
+ self._jwrite = self._jwrite.option(k, to_str(v))
return self
@overload
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py
b/python/pyspark/sql/tests/streaming/test_streaming.py
index ba39ca513610..c90943896809 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -667,6 +667,26 @@ class StreamingTestsMixin:
result = self.spark.sql("SELECT * FROM
test_streaming_drop_duplicates_within_wm").collect()
self.assertTrue(len(result) >= 6 and len(result) <= 9)
+ def test_stream_reader_option_none_chains_safely(self):
+ df = (
+ self.spark.readStream.format("rate")
+ .option("rowsPerSecond", None)
+ .options(numPartitions=None)
+ .option("rowsPerSecond", "5")
+ .load()
+ )
+ self.assertIsNotNone(df.schema)
+
+ def test_stream_writer_option_none_chains_safely(self):
+ df = self.spark.readStream.format("rate").option("rowsPerSecond",
"5").load()
+ writer = (
+ df.writeStream.format("memory")
+ .queryName("opt_none_test")
+ .option("checkpointLocation", None)
+ .options(checkpointLocation=None)
+ )
+ self.assertIsNotNone(writer)
+
class StreamingTests(StreamingTestsMixin, ReusedSQLTestCase):
def _assert_exception_tree_contains_msg(self, exception, msg):
diff --git a/python/pyspark/sql/tests/test_readwriter.py
b/python/pyspark/sql/tests/test_readwriter.py
index 059535be166d..5dd5b1ebff63 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -313,6 +313,27 @@ class ReadwriterTestsMixin:
).changes("nonexistent_table")
self.assertIn("changes", str(ctx.exception))
+ def test_option_none_is_filtered(self):
+ with tempfile.TemporaryDirectory() as d:
+ path = os.path.join(d, "data.csv")
+ with open(path, "w") as f:
+ f.write('"",val\n')
+ schema = "a STRING, b STRING"
+ expected = [Row(a=None, b="val")]
+ self.assertEqual(
+ self.spark.read.schema(schema).option("nullValue",
None).csv(path).collect(),
+ expected,
+ )
+ self.assertEqual(
+
self.spark.read.schema(schema).options(nullValue=None).csv(path).collect(),
+ expected,
+ )
+
+ def test_writer_option_none_chains_safely(self):
+ df = self.spark.createDataFrame([(1,)], "x INT")
+ self.assertIsNotNone(df.write.option("foo", None).option("bar", "baz"))
+ self.assertIsNotNone(df.write.options(foo=None, bar="baz"))
+
class ReadwriterV2TestsMixin:
def test_api(self):
@@ -419,6 +440,11 @@ class ReadwriterV2TestsMixin:
self.assertEqual(get_cluster_by_cols(), ["x"])
self.assertSetEqual(set(data),
set(self.spark.table(table_name).collect()))
+ def test_v2_writer_option_none_chains_safely(self):
+ df = self.spark.createDataFrame([(1,)], "x INT")
+ self.assertIsNotNone(df.writeTo("notexist").option("foo",
None).option("bar", "baz"))
+ self.assertIsNotNone(df.writeTo("notexist").options(foo=None,
bar="baz"))
+
class ReadwriterTests(ReadwriterTestsMixin, ReusedSQLTestCase):
pass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]