This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 08d69ffae935 [SPARK-49344][PS] Support `json_normalize` for Pandas API
on Spark
08d69ffae935 is described below
commit 08d69ffae935ae35df385cb2b09b0f5bb86d18bf
Author: Haejoon Lee <[email protected]>
AuthorDate: Fri Aug 23 10:40:08 2024 +0900
[SPARK-49344][PS] Support `json_normalize` for Pandas API on Spark
### What changes were proposed in this pull request?
This PR proposes to support `json_normalize` for Pandas API on Spark.
### Why are the changes needed?
For Pandas feature parity:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.json_normalize.html
### Does this PR introduce _any_ user-facing change?
Introduce new API `ps.json_normalize`
```python
>>> data = [
... {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode":
"10001"}},
... {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode":
"94105"}},
... ]
>>> ps.json_normalize(data)
id name address.city address.zipcode
0 1 Alice NYC 10001
1 2 Bob SF 94105
```
### How was this patch tested?
Added UTs
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47840 from itholic/json_normalize.
Lead-authored-by: Haejoon Lee <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/docs/source/reference/pyspark.pandas/io.rst | 1 +
python/pyspark/pandas/namespace.py | 78 ++++++++++++++++++++++
python/pyspark/pandas/tests/test_namespace.py | 56 ++++++++++++++++
3 files changed, 135 insertions(+)
diff --git a/python/docs/source/reference/pyspark.pandas/io.rst
b/python/docs/source/reference/pyspark.pandas/io.rst
index fd41a03699ca..7f7a39e98193 100644
--- a/python/docs/source/reference/pyspark.pandas/io.rst
+++ b/python/docs/source/reference/pyspark.pandas/io.rst
@@ -105,6 +105,7 @@ JSON
.. autosummary::
:toctree: api/
+ json_normalize
read_json
DataFrame.to_json
diff --git a/python/pyspark/pandas/namespace.py
b/python/pyspark/pandas/namespace.py
index 729f4f598407..c77cdf51a2f6 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -88,6 +88,7 @@ from pyspark.pandas.internal import (
DEFAULT_SERIES_NAME,
HIDDEN_COLUMNS,
SPARK_INDEX_NAME_FORMAT,
+ NATURAL_ORDER_COLUMN_NAME,
)
from pyspark.pandas.series import Series, first_series
from pyspark.pandas.spark.utils import as_nullable_spark_type,
force_decimal_precision_scale
@@ -125,6 +126,7 @@ __all__ = [
"to_numeric",
"broadcast",
"read_orc",
+ "json_normalize",
]
@@ -3687,6 +3689,82 @@ def read_orc(
return psdf
+def json_normalize(
+ data: Union[Dict, List[Dict]],
+ sep: str = ".",
+) -> DataFrame:
+ """
+ Normalize semi-structured JSON data into a flat table.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ data : dict or list of dicts
+ Unserialized JSON objects.
+ sep : str, default '.'
+ Nested records will generate names separated by sep.
+
+ Returns
+ -------
+ DataFrame
+
+ See Also
+ --------
+ DataFrame.to_json : Convert the pandas-on-Spark DataFrame to a JSON string.
+
+ Examples
+ --------
+ >>> data = [
+ ... {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode":
"10001"}},
+ ... {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode":
"94105"}},
+ ... ]
+ >>> ps.json_normalize(data)
+ id name address.city address.zipcode
+ 0 1 Alice NYC 10001
+ 1 2 Bob SF 94105
+ """
+ # Convert the input JSON data to a Pandas-on-Spark DataFrame.
+ psdf: DataFrame = ps.DataFrame(data)
+ internal = psdf._internal
+ sdf = internal.spark_frame
+
+ index_spark_column_names = internal.index_spark_column_names
+
+ def flatten_schema(schema: StructType, prefix: str = "") ->
Tuple[List[str], List[str]]:
+ """
+ Recursively flattens a nested schema and returns a list of columns and
aliases.
+ """
+ fields = []
+ aliases = []
+ for field in schema.fields:
+ field_name = field.name
+ if field_name not in index_spark_column_names +
[NATURAL_ORDER_COLUMN_NAME]:
+ name = f"{prefix}.{field_name}" if prefix else field_name
+ alias = f"{prefix}{sep}{field_name}" if prefix else field_name
+ if isinstance(field.dataType, StructType):
+ subfields, subaliases = flatten_schema(field.dataType,
prefix=name)
+ fields += subfields
+ aliases += subaliases
+ else:
+ fields.append(name)
+ aliases.append(alias)
+ return fields, aliases
+
+ fields, aliases = flatten_schema(sdf.schema)
+
+ # Create columns using fields and aliases
+ selected_columns = [F.col(field).alias(alias) for field, alias in
zip(fields, aliases)]
+
+ # Update internal frame with new columns
+ internal = internal.with_new_columns(
+ selected_columns, column_labels=[(column_label,) for column_label in
aliases]
+ )
+
+ # Convert back to Pandas-on-Spark DataFrame
+ return ps.DataFrame(internal)
+
+
def _get_index_map(
sdf: PySparkDataFrame, index_col: Optional[Union[str, List[str]]] = None
) -> Tuple[Optional[List[PySparkColumn]], Optional[List[Label]]]:
diff --git a/python/pyspark/pandas/tests/test_namespace.py
b/python/pyspark/pandas/tests/test_namespace.py
index 7024ef2a977c..6e559937008d 100644
--- a/python/pyspark/pandas/tests/test_namespace.py
+++ b/python/pyspark/pandas/tests/test_namespace.py
@@ -28,6 +28,7 @@ from pyspark.pandas.utils import spark_column_equals
from pyspark.pandas.missing.general_functions import
MissingPandasLikeGeneralFunctions
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
+from pyspark.pandas.testing import assert_frame_equal
class NamespaceTestsMixin:
@@ -606,6 +607,61 @@ class NamespaceTestsMixin:
lambda: ps.to_numeric(psser, errors="ignore"),
)
+ def test_json_normalize(self):
+ # Basic test case with a simple JSON structure
+ data = [
+ {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode":
"10001"}},
+ {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode":
"94105"}},
+ ]
+ assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))
+
+ # Test case with nested JSON structure
+ data = [
+ {"id": 1, "name": "Alice", "address": {"city": {"name": "NYC"},
"zipcode": "10001"}},
+ {"id": 2, "name": "Bob", "address": {"city": {"name": "SF"},
"zipcode": "94105"}},
+ ]
+ assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))
+
+ # Test case with lists included in the JSON structure
+ data = [
+ {
+ "id": 1,
+ "name": "Alice",
+ "hobbies": ["reading", "swimming"],
+ "address": {"city": "NYC", "zipcode": "10001"},
+ },
+ {
+ "id": 2,
+ "name": "Bob",
+ "hobbies": ["biking"],
+ "address": {"city": "SF", "zipcode": "94105"},
+ },
+ ]
+ assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))
+
+ # Test case with various data types
+ data = [
+ {
+ "id": 1,
+ "name": "Alice",
+ "age": 25,
+ "is_student": True,
+ "address": {"city": "NYC", "zipcode": "10001"},
+ },
+ {
+ "id": 2,
+ "name": "Bob",
+ "age": 30,
+ "is_student": False,
+ "address": {"city": "SF", "zipcode": "94105"},
+ },
+ ]
+ assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))
+
+ # Test case handling empty input data
+ data = []
+ self.assert_eq(pd.json_normalize(data), ps.json_normalize(data))
+
def test_missing(self):
missing_functions = inspect.getmembers(
MissingPandasLikeGeneralFunctions, inspect.isfunction
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]