This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 789acc92d143 [SPARK-55090][PYTHON][CONNECT] Implement 
`DataFrame.toJSON` in Python Client
789acc92d143 is described below

commit 789acc92d14367e3450196b01f7341e9d0745f20
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Jan 20 08:37:53 2026 +0900

    [SPARK-55090][PYTHON][CONNECT] Implement `DataFrame.toJSON` in Python Client
    
    ### What changes were proposed in this pull request?
    Implement `DataFrame.toJSON` in Python Client
    
    ### Why are the changes needed?
    to provide a similar way for creating json string
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, new API
    
    ```
    In [2]: df = spark.range(10).withColumn("s", col("id").cast("string"))
    
    In [3]: df.toJSON().show()
    +----------------+
    |           value|
    +----------------+
    |{"id":0,"s":"0"}|
    |{"id":1,"s":"1"}|
    |{"id":2,"s":"2"}|
    |{"id":3,"s":"3"}|
    |{"id":4,"s":"4"}|
    |{"id":5,"s":"5"}|
    |{"id":6,"s":"6"}|
    |{"id":7,"s":"7"}|
    |{"id":8,"s":"8"}|
    |{"id":9,"s":"9"}|
    +----------------+
    ```
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53849 from zhengruifeng/connect_to_json.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/classic/dataframe.py            |  5 +++
 python/pyspark/sql/connect/dataframe.py            | 11 +++---
 python/pyspark/sql/dataframe.py                    | 42 ++++++++++++----------
 .../sql/tests/connect/test_connect_basic.py        |  8 +++++
 .../sql/tests/connect/test_connect_error.py        |  2 --
 .../sql/tests/test_connect_compatibility.py        |  2 +-
 6 files changed, 41 insertions(+), 29 deletions(-)

diff --git a/python/pyspark/sql/classic/dataframe.py 
b/python/pyspark/sql/classic/dataframe.py
index 634006bdbf8c..b52dbda5d5f2 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -171,6 +171,11 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin, 
PandasConversionMixin):
     def toJSON(self, use_unicode: bool = True) -> "RDD[str]":
         from pyspark.core.rdd import RDD
 
+        warnings.warn(
+            "The return type of DataFrame.toJSON will be changed from RDD to 
DataFrame "
+            "in future releases."
+        )
+
         rdd = self._jdf.toJSON()
         return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
 
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 6600cd0d739f..fa3b1a842090 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1731,7 +1731,7 @@ class DataFrame(ParentDataFrame):
         return self.columns
 
     def __getattr__(self, name: str) -> "Column":
-        if name in ["_jseq", "_jdf", "_jmap", "_jcols", "rdd", "toJSON"]:
+        if name in ["_jseq", "_jdf", "_jmap", "_jcols", "rdd"]:
             raise PySparkAttributeError(
                 errorClass="JVM_ATTRIBUTE_NOT_SUPPORTED", 
messageParameters={"attr_name": name}
             )
@@ -2262,13 +2262,10 @@ class DataFrame(ParentDataFrame):
         assert isinstance(checkpointed._plan, plan.CachedRemoteRelation)
         return checkpointed
 
-    if not is_remote_only():
+    def toJSON(self) -> ParentDataFrame:
+        return self.select(F.to_json(F.struct(F.col("*"))).alias("value"))
 
-        def toJSON(self, use_unicode: bool = True) -> "RDD[str]":
-            raise PySparkNotImplementedError(
-                errorClass="NOT_IMPLEMENTED",
-                messageParameters={"feature": "toJSON()"},
-            )
+    if not is_remote_only():
 
         @property
         def rdd(self) -> "RDD[Row]":
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 2ddfdda762d7..5dd1f0ba4894 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -245,31 +245,35 @@ class DataFrame:
         """
         ...
 
-    if not is_remote_only():
+    @dispatch_df_method
+    def toJSON(self, use_unicode: bool = True) -> Union["RDD[str]", 
"DataFrame"]:
+        """Converts a :class:`DataFrame` into a :class:`RDD` of string or 
:class:`DataFrame`.
 
-        def toJSON(self, use_unicode: bool = True) -> "RDD[str]":
-            """Converts a :class:`DataFrame` into a :class:`RDD` of string.
+        Each row is turned into a JSON document as one element in the returned 
RDD
+        or DataFrame.
 
-            Each row is turned into a JSON document as one element in the 
returned RDD.
+        .. versionadded:: 1.3.0
 
-            .. versionadded:: 1.3.0
+        .. versionchanged:: 4.2.0
+            Supports Spark Connect.
 
-            Parameters
-            ----------
-            use_unicode : bool, optional, default True
-                Whether to convert to unicode or not.
+        Parameters
+        ----------
+        use_unicode : bool, optional, default True
+            Whether to convert to unicode or not. Note that this argument is 
disallowed
+            in Spark Connect mode.
 
-            Returns
-            -------
-            :class:`RDD`
+        Returns
+        -------
+        :class:`RDD` (in Classic mode) or :class:`DataFrame` (in connect mode)
 
-            Examples
-            --------
-            >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], 
schema=["age", "name"])
-            >>> df.toJSON().first()
-            '{"age":2,"name":"Alice"}'
-            """
-            ...
+        Examples
+        --------
+        >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], 
schema=["age", "name"])
+        >>> df.toJSON().first()
+        '{"age":2,"name":"Alice"}'
+        """
+        ...
 
     @dispatch_df_method
     def registerTempTable(self, name: str) -> None:
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 282fae789022..de470b8ef26f 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -127,6 +127,14 @@ class SparkConnectSQLTestCase(ReusedMixedTestCase, 
PandasOnSparkTestUtils):
 
 
 class SparkConnectBasicTests(SparkConnectSQLTestCase):
+    def test_toJSON(self):
+        sdf = self.spark.range(10).withColumn("s", SF.col("id").cast("string"))
+        cdf = self.connect.range(10).withColumn("s", 
CF.col("id").cast("string"))
+
+        str1 = sdf.toJSON().collect()
+        str2 = [row.value for row in cdf.toJSON().collect()]
+        self.assertEqual(str1, str2)
+
     def test_serialization(self):
         from pyspark.cloudpickle import dumps, loads
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_error.py 
b/python/pyspark/sql/tests/connect/test_connect_error.py
index 60f07cc4dffd..f5da7d945922 100644
--- a/python/pyspark/sql/tests/connect/test_connect_error.py
+++ b/python/pyspark/sql/tests/connect/test_connect_error.py
@@ -170,8 +170,6 @@ class SparkConnectErrorTests(ReusedConnectTestCase):
     def test_unsupported_functions(self):
         # SPARK-41225: Disable unsupported functions.
         df = self.spark.range(10)
-        with self.assertRaises(NotImplementedError):
-            df.toJSON()
         with self.assertRaises(NotImplementedError):
             df.rdd
 
diff --git a/python/pyspark/sql/tests/test_connect_compatibility.py 
b/python/pyspark/sql/tests/test_connect_compatibility.py
index ec3fb4930077..56b212387fe4 100644
--- a/python/pyspark/sql/tests/test_connect_compatibility.py
+++ b/python/pyspark/sql/tests/test_connect_compatibility.py
@@ -100,7 +100,7 @@ class ConnectCompatibilityTestsMixin:
             connect_signature = inspect.signature(connect_methods[method])
 
             # Cannot support RDD arguments from Spark Connect
-            has_rdd_arguments = ("createDataFrame", "xml", "json")
+            has_rdd_arguments = ("createDataFrame", "xml", "json", "toJSON")
             if method not in has_rdd_arguments:
                 self.assertEqual(
                     classic_signature,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to