This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b6c6f812957 [SPARK-39189][PS] Support limit_area parameter in pandas 
API on Spark
b6c6f812957 is described below

commit b6c6f8129572a95ffe5e45f5385d279c6191c067
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue May 24 12:09:37 2022 +0900

    [SPARK-39189][PS] Support limit_area parameter in pandas API on Spark
    
    ### What changes were proposed in this pull request?
    interpolate supports param `limit_area`
    
    ### Why are the changes needed?
    to increase api coverage
    
    ```
    In [16]: s = ps.Series([np.nan, 0, 1, np.nan, 3, np.nan])
    
    In [17]: s.interpolate(limit_area='inside')
    Out[17]:
    0    NaN
    1    0.0
    2    1.0
    3    2.0
    4    3.0
    5    NaN
    dtype: float64
    
    In [18]: s.interpolate(limit_area='outside')
    Out[18]:
    0    NaN
    1    0.0
    2    1.0
    3    NaN
    4    3.0
    5    3.0
    dtype: float64
    
    In [19]: s.interpolate(limit_direction='both', limit_area='inside')
    Out[19]:
    0    NaN
    1    0.0
    2    1.0
    3    2.0
    4    3.0
    5    NaN
    dtype: float64
    
    In [20]: s.interpolate(limit_direction='both', limit_area='outside')
    Out[20]:
    0    0.0
    1    0.0
    2    1.0
    3    NaN
    4    3.0
    5    3.0
    dtype: float64
    
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    yes, one param added
    
    ### How was this patch tested?
    updated UT
    
    Closes #36555 from zhengruifeng/impl_interpolate_limit_area.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/frame.py                        |  5 ++++-
 python/pyspark/pandas/generic.py                      | 14 ++++++++++++--
 python/pyspark/pandas/series.py                       | 14 +++++++++++++-
 python/pyspark/pandas/tests/test_generic_functions.py | 16 ++++++++++++----
 4 files changed, 41 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 6049249d827..04eb8a845dd 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -5669,6 +5669,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         method: str = "linear",
         limit: Optional[int] = None,
         limit_direction: Optional[str] = None,
+        limit_area: Optional[str] = None,
     ) -> "DataFrame":
         if method not in ["linear"]:
             raise NotImplementedError("interpolate currently works only for 
method='linear'")
@@ -5678,6 +5679,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             limit_direction not in ["forward", "backward", "both"]
         ):
             raise ValueError("invalid limit_direction: 
'{}'".format(limit_direction))
+        if (limit_area is not None) and (limit_area not in ["inside", 
"outside"]):
+            raise ValueError("invalid limit_area: '{}'".format(limit_area))
 
         numeric_col_names = []
         for label in self._internal.column_labels:
@@ -5688,7 +5691,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         psdf = self[numeric_col_names]
         return psdf._apply_series_op(
             lambda psser: psser._interpolate(
-                method=method, limit=limit, limit_direction=limit_direction
+                method=method, limit=limit, limit_direction=limit_direction, 
limit_area=limit_area
             ),
             should_resolve=True,
         )
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index ec38935ced8..ce13ae5ad1b 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -3367,12 +3367,13 @@ class Frame(object, metaclass=ABCMeta):
 
     pad = ffill
 
-    # TODO: add 'axis', 'inplace', 'limit_area', 'downcast'
+    # TODO: add 'axis', 'inplace', 'downcast'
     def interpolate(
         self: FrameLike,
         method: str = "linear",
         limit: Optional[int] = None,
         limit_direction: Optional[str] = None,
+        limit_area: Optional[str] = None,
     ) -> FrameLike:
         """
         Fill NaN values using an interpolation method.
@@ -3400,6 +3401,13 @@ class Frame(object, metaclass=ABCMeta):
             Consecutive NaNs will be filled in this direction.
             One of {{'forward', 'backward', 'both'}}.
 
+        limit_area : str, default None
+            If limit is specified, consecutive NaNs will be filled with this 
restriction. One of:
+
+            * None: No fill restriction.
+            * 'inside': Only fill NaNs surrounded by valid values 
(interpolate).
+            * 'outside': Only fill NaNs outside valid values (extrapolate).
+
         Returns
         -------
         Series or DataFrame or None
@@ -3454,7 +3462,9 @@ class Frame(object, metaclass=ABCMeta):
         2  2.0  3.0 -3.0   9.0
         3  2.0  4.0 -4.0  16.0
         """
-        return self.interpolate(method=method, limit=limit, 
limit_direction=limit_direction)
+        return self.interpolate(
+            method=method, limit=limit, limit_direction=limit_direction, 
limit_area=limit_area
+        )
 
     @property
     def at(self) -> AtIndexer:
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index d748b99f344..e201df3e6f1 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -2176,14 +2176,18 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         method: str = "linear",
         limit: Optional[int] = None,
         limit_direction: Optional[str] = None,
+        limit_area: Optional[str] = None,
     ) -> "Series":
-        return self._interpolate(method=method, limit=limit, 
limit_direction=limit_direction)
+        return self._interpolate(
+            method=method, limit=limit, limit_direction=limit_direction, 
limit_area=limit_area
+        )
 
     def _interpolate(
         self,
         method: str = "linear",
         limit: Optional[int] = None,
         limit_direction: Optional[str] = None,
+        limit_area: Optional[str] = None,
     ) -> "Series":
         if method not in ["linear"]:
             raise NotImplementedError("interpolate currently works only for 
method='linear'")
@@ -2193,6 +2197,8 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
             limit_direction not in ["forward", "backward", "both"]
         ):
             raise ValueError("invalid limit_direction: 
'{}'".format(limit_direction))
+        if (limit_area is not None) and (limit_area not in ["inside", 
"outside"]):
+            raise ValueError("invalid limit_area: '{}'".format(limit_area))
 
         if not self.spark.nullable and not isinstance(
             self.spark.data_type, (FloatType, DoubleType)
@@ -2260,6 +2266,12 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
                 pad_head_cond = pad_head_cond & (null_index_backward <= 
F.lit(limit))
                 pad_tail_cond = pad_tail_cond & (null_index_forward <= 
F.lit(limit))
 
+        if limit_area == "inside":
+            pad_head_cond = SF.lit(False)
+            pad_tail_cond = SF.lit(False)
+        elif limit_area == "outside":
+            fill_cond = SF.lit(False)
+
         cond = self.isnull().spark.column
         scol = (
             F.when(cond & fill_cond, fill)
diff --git a/python/pyspark/pandas/tests/test_generic_functions.py 
b/python/pyspark/pandas/tests/test_generic_functions.py
index 2a83a038713..6add1f805b6 100644
--- a/python/pyspark/pandas/tests/test_generic_functions.py
+++ b/python/pyspark/pandas/tests/test_generic_functions.py
@@ -36,15 +36,23 @@ class GenericFunctionsTest(PandasOnSparkTestCase, 
TestUtils):
         with self.assertRaisesRegex(ValueError, "invalid limit_direction"):
             psdf.interpolate(limit_direction="jump")
 
+        with self.assertRaisesRegex(ValueError, "invalid limit_area"):
+            psdf.interpolate(limit_area="jump")
+
     def _test_interpolate(self, pobj):
         psobj = ps.from_pandas(pobj)
         self.assert_eq(psobj.interpolate(), pobj.interpolate())
         for limit in range(1, 5):
             for limit_direction in [None, "forward", "backward", "both"]:
-                self.assert_eq(
-                    psobj.interpolate(limit=limit, 
limit_direction=limit_direction),
-                    pobj.interpolate(limit=limit, 
limit_direction=limit_direction),
-                )
+                for limit_area in [None, "inside", "outside"]:
+                    self.assert_eq(
+                        psobj.interpolate(
+                            limit=limit, limit_direction=limit_direction, 
limit_area=limit_area
+                        ),
+                        pobj.interpolate(
+                            limit=limit, limit_direction=limit_direction, 
limit_area=limit_area
+                        ),
+                    )
 
     def test_interpolate(self):
         pser = pd.Series(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to