This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 59fb88733433 [MINOR][PYTHON] Leverage functools.cached_property in
`DataFrame`
59fb88733433 is described below
commit 59fb88733433ef76eb33004a6b82e0472da5485f
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Dec 27 14:14:13 2024 +0900
[MINOR][PYTHON] Leverage functools.cached_property in `DataFrame`
### What changes were proposed in this pull request?
This PR proposes to replace manual cached property with
`functools.cached_property` in DataFrame.
### Why are the changes needed?
To reduce code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49308 from HyukjinKwon/dataframe-cached-property.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/classic/dataframe.py | 47 +++++++++++----------------------
1 file changed, 16 insertions(+), 31 deletions(-)
diff --git a/python/pyspark/sql/classic/dataframe.py
b/python/pyspark/sql/classic/dataframe.py
index 05c19913adf3..cc44d2c9942d 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -21,7 +21,7 @@ import sys
import random
import warnings
from collections.abc import Iterable
-from functools import reduce
+from functools import reduce, cached_property
from typing import (
Any,
Callable,
@@ -118,8 +118,6 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
):
from pyspark.sql.context import SQLContext
- self._sql_ctx: Optional["SQLContext"] = None
-
if isinstance(sql_ctx, SQLContext):
assert not os.environ.get("SPARK_TESTING") # Sanity check for our
internal usage.
assert isinstance(sql_ctx, SQLContext)
@@ -136,14 +134,11 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
self._sc: "SparkContext" = sql_ctx._sc
self._jdf: "JavaObject" = jdf
self.is_cached = False
- # initialized lazily
- self._schema: Optional[StructType] = None
- self._lazy_rdd: Optional["RDD[Row]"] = None
# Check whether _repr_html is supported or not, we use it to avoid
calling _jdf twice
# by __repr__ and _repr_html_ while eager evaluation opens.
self._support_repr_html = False
- @property
+ @cached_property
def sql_ctx(self) -> "SQLContext":
from pyspark.sql.context import SQLContext
@@ -151,24 +146,18 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
"DataFrame.sql_ctx is an internal property, and will be removed "
"in future releases. Use DataFrame.sparkSession instead."
)
- if self._sql_ctx is None:
- self._sql_ctx = SQLContext._get_or_create(self._sc)
- return self._sql_ctx
+ return SQLContext._get_or_create(self._sc)
@property
def sparkSession(self) -> "SparkSession":
return self._session
- @property
+ @cached_property
def rdd(self) -> "RDD[Row]":
from pyspark.core.rdd import RDD
- if self._lazy_rdd is None:
- jrdd = self._jdf.javaToPython()
- self._lazy_rdd = RDD(
- jrdd, self.sparkSession._sc,
BatchedSerializer(CPickleSerializer())
- )
- return self._lazy_rdd
+ jrdd = self._jdf.javaToPython()
+ return RDD(jrdd, self.sparkSession._sc,
BatchedSerializer(CPickleSerializer()))
@property
def na(self) -> ParentDataFrameNaFunctions:
@@ -208,21 +197,17 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
def writeStream(self) -> DataStreamWriter:
return DataStreamWriter(self)
- @property
+ @cached_property
def schema(self) -> StructType:
- if self._schema is None:
- try:
- self._schema = cast(
- StructType,
_parse_datatype_json_string(self._jdf.schema().json())
- )
- except AnalysisException as e:
- raise e
- except Exception as e:
- raise PySparkValueError(
- errorClass="CANNOT_PARSE_DATATYPE",
- messageParameters={"error": str(e)},
- )
- return self._schema
+ try:
+ return cast(StructType,
_parse_datatype_json_string(self._jdf.schema().json()))
+ except AnalysisException as e:
+ raise e
+ except Exception as e:
+ raise PySparkValueError(
+ errorClass="CANNOT_PARSE_DATATYPE",
+ messageParameters={"error": str(e)},
+ )
def printSchema(self, level: Optional[int] = None) -> None:
if level:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]