This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b6c6f812957 [SPARK-39189][PS] Support limit_area parameter in pandas
API on Spark
b6c6f812957 is described below
commit b6c6f8129572a95ffe5e45f5385d279c6191c067
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue May 24 12:09:37 2022 +0900
[SPARK-39189][PS] Support limit_area parameter in pandas API on Spark
### What changes were proposed in this pull request?
interpolate supports param `limit_area`
### Why are the changes needed?
to increase api coverage
```
In [16]: s = ps.Series([np.nan, 0, 1, np.nan, 3, np.nan])
In [17]: s.interpolate(limit_area='inside')
Out[17]:
0 NaN
1 0.0
2 1.0
3 2.0
4 3.0
5 NaN
dtype: float64
In [18]: s.interpolate(limit_area='outside')
Out[18]:
0 NaN
1 0.0
2 1.0
3 NaN
4 3.0
5 3.0
dtype: float64
In [19]: s.interpolate(limit_direction='both', limit_area='inside')
Out[19]:
0 NaN
1 0.0
2 1.0
3 2.0
4 3.0
5 NaN
dtype: float64
In [20]: s.interpolate(limit_direction='both', limit_area='outside')
Out[20]:
0 0.0
1 0.0
2 1.0
3 NaN
4 3.0
5 3.0
dtype: float64
```
### Does this PR introduce _any_ user-facing change?
yes, one param added
### How was this patch tested?
updated UT
Closes #36555 from zhengruifeng/impl_interpolate_limit_area.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/frame.py | 5 ++++-
python/pyspark/pandas/generic.py | 14 ++++++++++++--
python/pyspark/pandas/series.py | 14 +++++++++++++-
python/pyspark/pandas/tests/test_generic_functions.py | 16 ++++++++++++----
4 files changed, 41 insertions(+), 8 deletions(-)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 6049249d827..04eb8a845dd 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -5669,6 +5669,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
method: str = "linear",
limit: Optional[int] = None,
limit_direction: Optional[str] = None,
+ limit_area: Optional[str] = None,
) -> "DataFrame":
if method not in ["linear"]:
raise NotImplementedError("interpolate currently works only for
method='linear'")
@@ -5678,6 +5679,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
limit_direction not in ["forward", "backward", "both"]
):
raise ValueError("invalid limit_direction:
'{}'".format(limit_direction))
+ if (limit_area is not None) and (limit_area not in ["inside",
"outside"]):
+ raise ValueError("invalid limit_area: '{}'".format(limit_area))
numeric_col_names = []
for label in self._internal.column_labels:
@@ -5688,7 +5691,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
psdf = self[numeric_col_names]
return psdf._apply_series_op(
lambda psser: psser._interpolate(
- method=method, limit=limit, limit_direction=limit_direction
+ method=method, limit=limit, limit_direction=limit_direction,
limit_area=limit_area
),
should_resolve=True,
)
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index ec38935ced8..ce13ae5ad1b 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -3367,12 +3367,13 @@ class Frame(object, metaclass=ABCMeta):
pad = ffill
- # TODO: add 'axis', 'inplace', 'limit_area', 'downcast'
+ # TODO: add 'axis', 'inplace', 'downcast'
def interpolate(
self: FrameLike,
method: str = "linear",
limit: Optional[int] = None,
limit_direction: Optional[str] = None,
+ limit_area: Optional[str] = None,
) -> FrameLike:
"""
Fill NaN values using an interpolation method.
@@ -3400,6 +3401,13 @@ class Frame(object, metaclass=ABCMeta):
Consecutive NaNs will be filled in this direction.
One of {{'forward', 'backward', 'both'}}.
+ limit_area : str, default None
+ If limit is specified, consecutive NaNs will be filled with this
restriction. One of:
+
+ * None: No fill restriction.
+ * 'inside': Only fill NaNs surrounded by valid values
(interpolate).
+ * 'outside': Only fill NaNs outside valid values (extrapolate).
+
Returns
-------
Series or DataFrame or None
@@ -3454,7 +3462,9 @@ class Frame(object, metaclass=ABCMeta):
2 2.0 3.0 -3.0 9.0
3 2.0 4.0 -4.0 16.0
"""
- return self.interpolate(method=method, limit=limit,
limit_direction=limit_direction)
+ return self.interpolate(
+ method=method, limit=limit, limit_direction=limit_direction,
limit_area=limit_area
+ )
@property
def at(self) -> AtIndexer:
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index d748b99f344..e201df3e6f1 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -2176,14 +2176,18 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
method: str = "linear",
limit: Optional[int] = None,
limit_direction: Optional[str] = None,
+ limit_area: Optional[str] = None,
) -> "Series":
- return self._interpolate(method=method, limit=limit,
limit_direction=limit_direction)
+ return self._interpolate(
+ method=method, limit=limit, limit_direction=limit_direction,
limit_area=limit_area
+ )
def _interpolate(
self,
method: str = "linear",
limit: Optional[int] = None,
limit_direction: Optional[str] = None,
+ limit_area: Optional[str] = None,
) -> "Series":
if method not in ["linear"]:
raise NotImplementedError("interpolate currently works only for
method='linear'")
@@ -2193,6 +2197,8 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
limit_direction not in ["forward", "backward", "both"]
):
raise ValueError("invalid limit_direction:
'{}'".format(limit_direction))
+ if (limit_area is not None) and (limit_area not in ["inside",
"outside"]):
+ raise ValueError("invalid limit_area: '{}'".format(limit_area))
if not self.spark.nullable and not isinstance(
self.spark.data_type, (FloatType, DoubleType)
@@ -2260,6 +2266,12 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
pad_head_cond = pad_head_cond & (null_index_backward <=
F.lit(limit))
pad_tail_cond = pad_tail_cond & (null_index_forward <=
F.lit(limit))
+ if limit_area == "inside":
+ pad_head_cond = SF.lit(False)
+ pad_tail_cond = SF.lit(False)
+ elif limit_area == "outside":
+ fill_cond = SF.lit(False)
+
cond = self.isnull().spark.column
scol = (
F.when(cond & fill_cond, fill)
diff --git a/python/pyspark/pandas/tests/test_generic_functions.py
b/python/pyspark/pandas/tests/test_generic_functions.py
index 2a83a038713..6add1f805b6 100644
--- a/python/pyspark/pandas/tests/test_generic_functions.py
+++ b/python/pyspark/pandas/tests/test_generic_functions.py
@@ -36,15 +36,23 @@ class GenericFunctionsTest(PandasOnSparkTestCase,
TestUtils):
with self.assertRaisesRegex(ValueError, "invalid limit_direction"):
psdf.interpolate(limit_direction="jump")
+ with self.assertRaisesRegex(ValueError, "invalid limit_area"):
+ psdf.interpolate(limit_area="jump")
+
def _test_interpolate(self, pobj):
psobj = ps.from_pandas(pobj)
self.assert_eq(psobj.interpolate(), pobj.interpolate())
for limit in range(1, 5):
for limit_direction in [None, "forward", "backward", "both"]:
- self.assert_eq(
- psobj.interpolate(limit=limit,
limit_direction=limit_direction),
- pobj.interpolate(limit=limit,
limit_direction=limit_direction),
- )
+ for limit_area in [None, "inside", "outside"]:
+ self.assert_eq(
+ psobj.interpolate(
+ limit=limit, limit_direction=limit_direction,
limit_area=limit_area
+ ),
+ pobj.interpolate(
+ limit=limit, limit_direction=limit_direction,
limit_area=limit_area
+ ),
+ )
def test_interpolate(self):
pser = pd.Series(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]