This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 789acc92d143 [SPARK-55090][PYTHON][CONNECT] Implement
`DataFrame.toJSON` in Python Client
789acc92d143 is described below
commit 789acc92d14367e3450196b01f7341e9d0745f20
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Jan 20 08:37:53 2026 +0900
[SPARK-55090][PYTHON][CONNECT] Implement `DataFrame.toJSON` in Python Client
### What changes were proposed in this pull request?
Implement `DataFrame.toJSON` in Python Client
### Why are the changes needed?
to provide a similar way for creating json string
### Does this PR introduce _any_ user-facing change?
Yes, new API
```
In [2]: df = spark.range(10).withColumn("s", col("id").cast("string"))
In [3]: df.toJSON().show()
+----------------+
| value|
+----------------+
|{"id":0,"s":"0"}|
|{"id":1,"s":"1"}|
|{"id":2,"s":"2"}|
|{"id":3,"s":"3"}|
|{"id":4,"s":"4"}|
|{"id":5,"s":"5"}|
|{"id":6,"s":"6"}|
|{"id":7,"s":"7"}|
|{"id":8,"s":"8"}|
|{"id":9,"s":"9"}|
+----------------+
```
### How was this patch tested?
added tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #53849 from zhengruifeng/connect_to_json.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/classic/dataframe.py | 5 +++
python/pyspark/sql/connect/dataframe.py | 11 +++---
python/pyspark/sql/dataframe.py | 42 ++++++++++++----------
.../sql/tests/connect/test_connect_basic.py | 8 +++++
.../sql/tests/connect/test_connect_error.py | 2 --
.../sql/tests/test_connect_compatibility.py | 2 +-
6 files changed, 41 insertions(+), 29 deletions(-)
diff --git a/python/pyspark/sql/classic/dataframe.py
b/python/pyspark/sql/classic/dataframe.py
index 634006bdbf8c..b52dbda5d5f2 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -171,6 +171,11 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
def toJSON(self, use_unicode: bool = True) -> "RDD[str]":
from pyspark.core.rdd import RDD
+ warnings.warn(
+ "The return type of DataFrame.toJSON will be changed from RDD to
DataFrame "
+ "in future releases."
+ )
+
rdd = self._jdf.toJSON()
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 6600cd0d739f..fa3b1a842090 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1731,7 +1731,7 @@ class DataFrame(ParentDataFrame):
return self.columns
def __getattr__(self, name: str) -> "Column":
- if name in ["_jseq", "_jdf", "_jmap", "_jcols", "rdd", "toJSON"]:
+ if name in ["_jseq", "_jdf", "_jmap", "_jcols", "rdd"]:
raise PySparkAttributeError(
errorClass="JVM_ATTRIBUTE_NOT_SUPPORTED",
messageParameters={"attr_name": name}
)
@@ -2262,13 +2262,10 @@ class DataFrame(ParentDataFrame):
assert isinstance(checkpointed._plan, plan.CachedRemoteRelation)
return checkpointed
- if not is_remote_only():
+ def toJSON(self) -> ParentDataFrame:
+ return self.select(F.to_json(F.struct(F.col("*"))).alias("value"))
- def toJSON(self, use_unicode: bool = True) -> "RDD[str]":
- raise PySparkNotImplementedError(
- errorClass="NOT_IMPLEMENTED",
- messageParameters={"feature": "toJSON()"},
- )
+ if not is_remote_only():
@property
def rdd(self) -> "RDD[Row]":
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 2ddfdda762d7..5dd1f0ba4894 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -245,31 +245,35 @@ class DataFrame:
"""
...
- if not is_remote_only():
+ @dispatch_df_method
+ def toJSON(self, use_unicode: bool = True) -> Union["RDD[str]",
"DataFrame"]:
+ """Converts a :class:`DataFrame` into a :class:`RDD` of string or
:class:`DataFrame`.
- def toJSON(self, use_unicode: bool = True) -> "RDD[str]":
- """Converts a :class:`DataFrame` into a :class:`RDD` of string.
+ Each row is turned into a JSON document as one element in the returned
RDD
+ or DataFrame.
- Each row is turned into a JSON document as one element in the
returned RDD.
+ .. versionadded:: 1.3.0
- .. versionadded:: 1.3.0
+ .. versionchanged:: 4.2.0
+ Supports Spark Connect.
- Parameters
- ----------
- use_unicode : bool, optional, default True
- Whether to convert to unicode or not.
+ Parameters
+ ----------
+ use_unicode : bool, optional, default True
+ Whether to convert to unicode or not. Note that this argument is
disallowed
+ in Spark Connect mode.
- Returns
- -------
- :class:`RDD`
+ Returns
+ -------
+ :class:`RDD` (in Classic mode) or :class:`DataFrame` (in connect mode)
- Examples
- --------
- >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")],
schema=["age", "name"])
- >>> df.toJSON().first()
- '{"age":2,"name":"Alice"}'
- """
- ...
+ Examples
+ --------
+ >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")],
schema=["age", "name"])
+ >>> df.toJSON().first()
+ '{"age":2,"name":"Alice"}'
+ """
+ ...
@dispatch_df_method
def registerTempTable(self, name: str) -> None:
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 282fae789022..de470b8ef26f 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -127,6 +127,14 @@ class SparkConnectSQLTestCase(ReusedMixedTestCase,
PandasOnSparkTestUtils):
class SparkConnectBasicTests(SparkConnectSQLTestCase):
+ def test_toJSON(self):
+ sdf = self.spark.range(10).withColumn("s", SF.col("id").cast("string"))
+ cdf = self.connect.range(10).withColumn("s",
CF.col("id").cast("string"))
+
+ str1 = sdf.toJSON().collect()
+ str2 = [row.value for row in cdf.toJSON().collect()]
+ self.assertEqual(str1, str2)
+
def test_serialization(self):
from pyspark.cloudpickle import dumps, loads
diff --git a/python/pyspark/sql/tests/connect/test_connect_error.py
b/python/pyspark/sql/tests/connect/test_connect_error.py
index 60f07cc4dffd..f5da7d945922 100644
--- a/python/pyspark/sql/tests/connect/test_connect_error.py
+++ b/python/pyspark/sql/tests/connect/test_connect_error.py
@@ -170,8 +170,6 @@ class SparkConnectErrorTests(ReusedConnectTestCase):
def test_unsupported_functions(self):
# SPARK-41225: Disable unsupported functions.
df = self.spark.range(10)
- with self.assertRaises(NotImplementedError):
- df.toJSON()
with self.assertRaises(NotImplementedError):
df.rdd
diff --git a/python/pyspark/sql/tests/test_connect_compatibility.py
b/python/pyspark/sql/tests/test_connect_compatibility.py
index ec3fb4930077..56b212387fe4 100644
--- a/python/pyspark/sql/tests/test_connect_compatibility.py
+++ b/python/pyspark/sql/tests/test_connect_compatibility.py
@@ -100,7 +100,7 @@ class ConnectCompatibilityTestsMixin:
connect_signature = inspect.signature(connect_methods[method])
# Cannot support RDD arguments from Spark Connect
- has_rdd_arguments = ("createDataFrame", "xml", "json")
+ has_rdd_arguments = ("createDataFrame", "xml", "json", "toJSON")
if method not in has_rdd_arguments:
self.assertEqual(
classic_signature,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]