This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d69ffae935 [SPARK-49344][PS] Support `json_normalize` for Pandas API 
on Spark
08d69ffae935 is described below

commit 08d69ffae935ae35df385cb2b09b0f5bb86d18bf
Author: Haejoon Lee <[email protected]>
AuthorDate: Fri Aug 23 10:40:08 2024 +0900

    [SPARK-49344][PS] Support `json_normalize` for Pandas API on Spark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to support `json_normalize` for Pandas API on Spark.
    
    ### Why are the changes needed?
    
    For Pandas feature parity: 
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.json_normalize.html
    
    ### Does this PR introduce _any_ user-facing change?
    
    Introduce new API `ps.json_normalize`
    
    ```python
    >>> data = [
    ...     {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode": 
"10001"}},
    ...     {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode": 
"94105"}},
    ... ]
    >>> ps.json_normalize(data)
       id   name address.city address.zipcode
    0   1  Alice          NYC           10001
    1   2    Bob           SF           94105
    ```
    
    ### How was this patch tested?
    
    Added UTs
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47840 from itholic/json_normalize.
    
    Lead-authored-by: Haejoon Lee <[email protected]>
    Co-authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/docs/source/reference/pyspark.pandas/io.rst |  1 +
 python/pyspark/pandas/namespace.py                 | 78 ++++++++++++++++++++++
 python/pyspark/pandas/tests/test_namespace.py      | 56 ++++++++++++++++
 3 files changed, 135 insertions(+)

diff --git a/python/docs/source/reference/pyspark.pandas/io.rst 
b/python/docs/source/reference/pyspark.pandas/io.rst
index fd41a03699ca..7f7a39e98193 100644
--- a/python/docs/source/reference/pyspark.pandas/io.rst
+++ b/python/docs/source/reference/pyspark.pandas/io.rst
@@ -105,6 +105,7 @@ JSON
 .. autosummary::
    :toctree: api/
 
+   json_normalize
    read_json
    DataFrame.to_json
 
diff --git a/python/pyspark/pandas/namespace.py 
b/python/pyspark/pandas/namespace.py
index 729f4f598407..c77cdf51a2f6 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -88,6 +88,7 @@ from pyspark.pandas.internal import (
     DEFAULT_SERIES_NAME,
     HIDDEN_COLUMNS,
     SPARK_INDEX_NAME_FORMAT,
+    NATURAL_ORDER_COLUMN_NAME,
 )
 from pyspark.pandas.series import Series, first_series
 from pyspark.pandas.spark.utils import as_nullable_spark_type, 
force_decimal_precision_scale
@@ -125,6 +126,7 @@ __all__ = [
     "to_numeric",
     "broadcast",
     "read_orc",
+    "json_normalize",
 ]
 
 
@@ -3687,6 +3689,82 @@ def read_orc(
     return psdf
 
 
+def json_normalize(
+    data: Union[Dict, List[Dict]],
+    sep: str = ".",
+) -> DataFrame:
+    """
+    Normalize semi-structured JSON data into a flat table.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    data : dict or list of dicts
+        Unserialized JSON objects.
+    sep : str, default '.'
+        Nested records will generate names separated by sep.
+
+    Returns
+    -------
+    DataFrame
+
+    See Also
+    --------
+    DataFrame.to_json : Convert the pandas-on-Spark DataFrame to a JSON string.
+
+    Examples
+    --------
+    >>> data = [
+    ...     {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode": 
"10001"}},
+    ...     {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode": 
"94105"}},
+    ... ]
+    >>> ps.json_normalize(data)
+       id   name address.city address.zipcode
+    0   1  Alice          NYC           10001
+    1   2    Bob           SF           94105
+    """
+    # Convert the input JSON data to a Pandas-on-Spark DataFrame.
+    psdf: DataFrame = ps.DataFrame(data)
+    internal = psdf._internal
+    sdf = internal.spark_frame
+
+    index_spark_column_names = internal.index_spark_column_names
+
+    def flatten_schema(schema: StructType, prefix: str = "") -> 
Tuple[List[str], List[str]]:
+        """
+        Recursively flattens a nested schema and returns a list of columns and 
aliases.
+        """
+        fields = []
+        aliases = []
+        for field in schema.fields:
+            field_name = field.name
+            if field_name not in index_spark_column_names + 
[NATURAL_ORDER_COLUMN_NAME]:
+                name = f"{prefix}.{field_name}" if prefix else field_name
+                alias = f"{prefix}{sep}{field_name}" if prefix else field_name
+                if isinstance(field.dataType, StructType):
+                    subfields, subaliases = flatten_schema(field.dataType, 
prefix=name)
+                    fields += subfields
+                    aliases += subaliases
+                else:
+                    fields.append(name)
+                    aliases.append(alias)
+        return fields, aliases
+
+    fields, aliases = flatten_schema(sdf.schema)
+
+    # Create columns using fields and aliases
+    selected_columns = [F.col(field).alias(alias) for field, alias in 
zip(fields, aliases)]
+
+    # Update internal frame with new columns
+    internal = internal.with_new_columns(
+        selected_columns, column_labels=[(column_label,) for column_label in 
aliases]
+    )
+
+    # Convert back to Pandas-on-Spark DataFrame
+    return ps.DataFrame(internal)
+
+
 def _get_index_map(
     sdf: PySparkDataFrame, index_col: Optional[Union[str, List[str]]] = None
 ) -> Tuple[Optional[List[PySparkColumn]], Optional[List[Label]]]:
diff --git a/python/pyspark/pandas/tests/test_namespace.py 
b/python/pyspark/pandas/tests/test_namespace.py
index 7024ef2a977c..6e559937008d 100644
--- a/python/pyspark/pandas/tests/test_namespace.py
+++ b/python/pyspark/pandas/tests/test_namespace.py
@@ -28,6 +28,7 @@ from pyspark.pandas.utils import spark_column_equals
 from pyspark.pandas.missing.general_functions import 
MissingPandasLikeGeneralFunctions
 from pyspark.testing.pandasutils import PandasOnSparkTestCase
 from pyspark.testing.sqlutils import SQLTestUtils
+from pyspark.pandas.testing import assert_frame_equal
 
 
 class NamespaceTestsMixin:
@@ -606,6 +607,61 @@ class NamespaceTestsMixin:
             lambda: ps.to_numeric(psser, errors="ignore"),
         )
 
+    def test_json_normalize(self):
+        # Basic test case with a simple JSON structure
+        data = [
+            {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode": 
"10001"}},
+            {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode": 
"94105"}},
+        ]
+        assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))
+
+        # Test case with nested JSON structure
+        data = [
+            {"id": 1, "name": "Alice", "address": {"city": {"name": "NYC"}, 
"zipcode": "10001"}},
+            {"id": 2, "name": "Bob", "address": {"city": {"name": "SF"}, 
"zipcode": "94105"}},
+        ]
+        assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))
+
+        # Test case with lists included in the JSON structure
+        data = [
+            {
+                "id": 1,
+                "name": "Alice",
+                "hobbies": ["reading", "swimming"],
+                "address": {"city": "NYC", "zipcode": "10001"},
+            },
+            {
+                "id": 2,
+                "name": "Bob",
+                "hobbies": ["biking"],
+                "address": {"city": "SF", "zipcode": "94105"},
+            },
+        ]
+        assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))
+
+        # Test case with various data types
+        data = [
+            {
+                "id": 1,
+                "name": "Alice",
+                "age": 25,
+                "is_student": True,
+                "address": {"city": "NYC", "zipcode": "10001"},
+            },
+            {
+                "id": 2,
+                "name": "Bob",
+                "age": 30,
+                "is_student": False,
+                "address": {"city": "SF", "zipcode": "94105"},
+            },
+        ]
+        assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))
+
+        # Test case handling empty input data
+        data = []
+        self.assert_eq(pd.json_normalize(data), ps.json_normalize(data))
+
     def test_missing(self):
         missing_functions = inspect.getmembers(
             MissingPandasLikeGeneralFunctions, inspect.isfunction


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to