This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1dfb4399334 [SPARK-38943][PYTHON] EWM support ignore_na
1dfb4399334 is described below
commit 1dfb4399334a02bf2e54faeb214c4a387753ddce
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Apr 20 10:46:58 2022 +0900
[SPARK-38943][PYTHON] EWM support ignore_na
### What changes were proposed in this pull request?
EWM support ignore_na
### Why are the changes needed?
`ignore_na` is supported in pandas.
after adding this param, EWM can deal with dataset containing NaN/Null.
### Does this PR introduce _any_ user-facing change?
Yes, a new param added
### How was this patch tested?
added testsuites
Closes #36257 from zhengruifeng/ewm_support_ingnore_na.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/generic.py | 26 +++++-
python/pyspark/pandas/missing/window.py | 1 -
python/pyspark/pandas/tests/test_ewm.py | 92 ++++++++++++++++++++++
python/pyspark/pandas/window.py | 27 +++++--
.../catalyst/expressions/windowExpressions.scala | 34 +++++---
.../spark/sql/api/python/PythonSQLUtils.scala | 3 +-
6 files changed, 159 insertions(+), 24 deletions(-)
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index bb27c633a2b..21c880373ad 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -2619,7 +2619,7 @@ class Frame(object, metaclass=ABCMeta):
return Expanding(self, min_periods=min_periods)
- # TODO: 'adjust', 'ignore_na', 'axis', 'method' parameter should be
implemented.
+ # TODO: 'adjust', 'axis', 'method' parameter should be implemented.
def ewm(
self: FrameLike,
com: Optional[float] = None,
@@ -2627,6 +2627,7 @@ class Frame(object, metaclass=ABCMeta):
halflife: Optional[float] = None,
alpha: Optional[float] = None,
min_periods: Optional[int] = None,
+ ignore_na: bool_type = False,
) -> "ExponentialMoving[FrameLike]":
"""
Provide exponentially weighted window transformations.
@@ -2659,6 +2660,21 @@ class Frame(object, metaclass=ABCMeta):
Minimum number of observations in window required to have a value
(otherwise result is NA).
+ ignore_na : bool, default False
+ Ignore missing values when calculating weights.
+
+ - When ``ignore_na=False`` (default), weights are based on
absolute positions.
+ For example, the weights of :math:`x_0` and :math:`x_2` used in
calculating
+ the final weighted average of [:math:`x_0`, None, :math:`x_2`]
are
+ :math:`(1-\alpha)^2` and :math:`1` if ``adjust=True``, and
+ :math:`(1-\alpha)^2` and :math:`\alpha` if ``adjust=False``.
+
+ - When ``ignore_na=True``, weights are based
+ on relative positions. For example, the weights of :math:`x_0`
and :math:`x_2`
+ used in calculating the final weighted average of
+ [:math:`x_0`, None, :math:`x_2`] are :math:`1-\alpha` and
:math:`1` if
+ ``adjust=True``, and :math:`1-\alpha` and :math:`\alpha` if
``adjust=False``.
+
Returns
-------
a Window sub-classed for the particular operation
@@ -2666,7 +2682,13 @@ class Frame(object, metaclass=ABCMeta):
from pyspark.pandas.window import ExponentialMoving
return ExponentialMoving(
- self, com=com, span=span, halflife=halflife, alpha=alpha,
min_periods=min_periods
+ self,
+ com=com,
+ span=span,
+ halflife=halflife,
+ alpha=alpha,
+ min_periods=min_periods,
+ ignore_na=ignore_na,
)
def get(self, key: Any, default: Optional[Any] = None) -> Any:
diff --git a/python/pyspark/pandas/missing/window.py
b/python/pyspark/pandas/missing/window.py
index e6ac39901ff..237dc85c82c 100644
--- a/python/pyspark/pandas/missing/window.py
+++ b/python/pyspark/pandas/missing/window.py
@@ -152,6 +152,5 @@ class MissingPandasLikeExponentialMoving:
corr = _unsupported_function_exponential_moving("corr")
adjust = _unsupported_property_exponential_moving("adjust")
- ignore_na = _unsupported_property_exponential_moving("ignore_na")
axis = _unsupported_property_exponential_moving("axis")
method = _unsupported_property_exponential_moving("method")
diff --git a/python/pyspark/pandas/tests/test_ewm.py
b/python/pyspark/pandas/tests/test_ewm.py
index 7306aad44ff..d4c1e1ba06a 100644
--- a/python/pyspark/pandas/tests/test_ewm.py
+++ b/python/pyspark/pandas/tests/test_ewm.py
@@ -109,6 +109,98 @@ class EWMTest(PandasOnSparkTestCase, TestUtils):
getattr(pdf.ewm(alpha=0.7, min_periods=2), f)().sum(),
)
+ pdf = pd.DataFrame(
+ {
+ "s1": [None, 2, 3, 4],
+ "s2": [1, None, 3, 4],
+ "s3": [1, 3, 4, 5],
+ "s4": [1, 0, 3, 4],
+ "s5": [None, None, 1, None],
+ "s6": [None, None, None, None],
+ }
+ )
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(
+ getattr(psdf.ewm(com=0.2, ignore_na=True), f)(),
+ getattr(pdf.ewm(com=0.2, ignore_na=True), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(com=0.2, ignore_na=True), f)().sum(),
+ getattr(pdf.ewm(com=0.2, ignore_na=True), f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(com=0.2, ignore_na=False), f)(),
+ getattr(pdf.ewm(com=0.2, ignore_na=False), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(com=0.2, ignore_na=False), f)().sum(),
+ getattr(pdf.ewm(com=0.2, ignore_na=False), f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(span=1.7, ignore_na=True), f)(),
+ getattr(pdf.ewm(span=1.7, ignore_na=True), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(span=1.7, ignore_na=True), f)().sum(),
+ getattr(pdf.ewm(span=1.7, ignore_na=True), f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(span=1.7, ignore_na=False), f)(),
+ getattr(pdf.ewm(span=1.7, ignore_na=False), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(span=1.7, ignore_na=False), f)().sum(),
+ getattr(pdf.ewm(span=1.7, ignore_na=False), f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(halflife=0.5, ignore_na=True), f)(),
+ getattr(pdf.ewm(halflife=0.5, ignore_na=True), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(halflife=0.5, ignore_na=True), f)().sum(),
+ getattr(pdf.ewm(halflife=0.5, ignore_na=True), f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(halflife=0.5, ignore_na=False), f)(),
+ getattr(pdf.ewm(halflife=0.5, ignore_na=False), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(halflife=0.5, ignore_na=False), f)().sum(),
+ getattr(pdf.ewm(halflife=0.5, ignore_na=False), f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(alpha=0.7, ignore_na=True), f)(),
+ getattr(pdf.ewm(alpha=0.7, ignore_na=True), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(alpha=0.7, ignore_na=True), f)().sum(),
+ getattr(pdf.ewm(alpha=0.7, ignore_na=True), f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(alpha=0.7, ignore_na=False), f)(),
+ getattr(pdf.ewm(alpha=0.7, ignore_na=False), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(alpha=0.7, ignore_na=False), f)().sum(),
+ getattr(pdf.ewm(alpha=0.7, ignore_na=False), f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(alpha=0.7, ignore_na=True, min_periods=2), f)(),
+ getattr(pdf.ewm(alpha=0.7, ignore_na=True, min_periods=2), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(alpha=0.7, ignore_na=True, min_periods=2),
f)().sum(),
+ getattr(pdf.ewm(alpha=0.7, ignore_na=True, min_periods=2),
f)().sum(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(alpha=0.7, ignore_na=False, min_periods=2), f)(),
+ getattr(pdf.ewm(alpha=0.7, ignore_na=False, min_periods=2), f)(),
+ )
+ self.assert_eq(
+ getattr(psdf.ewm(alpha=0.7, ignore_na=False, min_periods=2),
f)().sum(),
+ getattr(pdf.ewm(alpha=0.7, ignore_na=False, min_periods=2),
f)().sum(),
+ )
+
def test_ewm_mean(self):
self._test_ewm_func("mean")
diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py
index 4c5ababf0c8..9203a40cb47 100644
--- a/python/pyspark/pandas/window.py
+++ b/python/pyspark/pandas/window.py
@@ -1762,12 +1762,14 @@ class ExponentialMovingLike(Generic[FrameLike],
metaclass=ABCMeta):
halflife: Optional[float] = None,
alpha: Optional[float] = None,
min_periods: Optional[int] = None,
+ ignore_na: bool = False,
):
if (min_periods is not None) and (min_periods < 0):
raise ValueError("min_periods must be >= 0")
if min_periods is None:
min_periods = 0
self._min_periods = min_periods
+ self._ignore_na = ignore_na
self._window = window
# This unbounded Window is later used to handle 'min_periods' for now.
@@ -1828,10 +1830,11 @@ class ExponentialMovingLike(Generic[FrameLike],
metaclass=ABCMeta):
unified_alpha = self._compute_unified_alpha()
def mean(scol: Column) -> Column:
- jf = SparkContext._active_spark_context._jvm.PythonSQLUtils.ewm
+ sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
return F.when(
- F.row_number().over(self._unbounded_window) >=
self._min_periods,
- Column(jf(scol._jc, unified_alpha)).over(self._window),
+ F.count(F.when(~scol.isNull(),
1).otherwise(None)).over(self._unbounded_window)
+ >= self._min_periods,
+ Column(sql_utils.ewm(scol._jc, unified_alpha,
self._ignore_na)).over(self._window),
).otherwise(SF.lit(None))
return self._apply_as_series_or_frame(mean)
@@ -1846,6 +1849,7 @@ class ExponentialMoving(ExponentialMovingLike[FrameLike]):
halflife: Optional[float] = None,
alpha: Optional[float] = None,
min_periods: Optional[int] = None,
+ ignore_na: bool = False,
):
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series
@@ -1861,7 +1865,7 @@ class ExponentialMoving(ExponentialMovingLike[FrameLike]):
Window.unboundedPreceding, Window.currentRow
)
- super().__init__(window_spec, com, span, halflife, alpha, min_periods)
+ super().__init__(window_spec, com, span, halflife, alpha, min_periods,
ignore_na)
def __getattr__(self, item: str) -> Any:
if hasattr(MissingPandasLikeExponentialMoving, item):
@@ -1882,7 +1886,6 @@ class ExponentialMoving(ExponentialMovingLike[FrameLike]):
-----
There are behavior differences between pandas-on-Spark and pandas.
- * the data should not contain NaNs. pandas-on-Spark will return an
error.
* the current implementation of this API uses Spark's Window without
specifying partition specification. This leads to move all data into
single partition in single machine and could cause serious
@@ -1928,10 +1931,18 @@ class
ExponentialMoving(ExponentialMovingLike[FrameLike]):
"""
return super().mean()
- # TODO: when add 'adjust' and 'ignore_na' parameter, should add to here
too.
+ # TODO: when add 'adjust' parameter, should add to here too.
def __repr__(self) -> str:
- return "ExponentialMoving [com={}, span={}, halflife={}, alpha={},
min_periods={}]".format(
- self._com, self._span, self._halflife, self._alpha,
self._min_periods
+ return (
+ "ExponentialMoving [com={}, span={}, halflife={}, alpha={}, "
+ "min_periods={}, ignore_na={}]".format(
+ self._com,
+ self._span,
+ self._halflife,
+ self._alpha,
+ self._min_periods,
+ self._ignore_na,
+ )
)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 458c2274432..c11a26ecd4f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -1025,7 +1025,7 @@ case class PercentRank(children: Seq[Expression]) extends
RankLike with SizeBase
* y_t = \frac{\sum_{i=0}^t w_i x_{t-i}}{\sum_{i=0}^t w_i},
* where x_t is the input, y_t is the result and the w_i are the weights.
*/
-case class EWM(input: Expression, alpha: Double)
+case class EWM(input: Expression, alpha: Double, ignoreNA: Boolean)
extends AggregateWindowFunction with UnaryLike[Expression] {
assert(0 < alpha && alpha <= 1)
@@ -1033,27 +1033,37 @@ case class EWM(input: Expression, alpha: Double)
private val numerator = AttributeReference("numerator", DoubleType, nullable
= false)()
private val denominator = AttributeReference("denominator", DoubleType,
nullable = false)()
- override def aggBufferAttributes: Seq[AttributeReference] = numerator ::
denominator :: Nil
+ private val result = AttributeReference("result", DoubleType, nullable =
true)()
- override val initialValues: Seq[Expression] = Seq(Literal(0.0), Literal(0.0))
+ override def aggBufferAttributes: Seq[AttributeReference] =
+ numerator :: denominator :: result :: Nil
+
+ override val initialValues: Seq[Expression] =
+ Literal(0.0) :: Literal(0.0) :: Literal.create(null, DoubleType) :: Nil
override val updateExpressions: Seq[Expression] = {
val beta = Literal(1.0 - alpha)
val casted = input.cast(DoubleType)
- // TODO: after adding param ignore_na, we can remove this check
- val error = RaiseError(Literal("Input values must not be null or
NaN")).cast(DoubleType)
- val validated = If(IsNull(casted) || IsNaN(casted), error, casted)
- Seq(
- /* numerator = */ numerator * beta + validated,
- /* denominator = */ denominator * beta + Literal(1.0)
- )
+ val isNA = IsNull(casted)
+ val newNumerator = numerator * beta + casted
+ val newDenominator = denominator * beta + Literal(1.0)
+
+ if (ignoreNA) {
+ /* numerator = */ If(isNA, numerator, newNumerator) ::
+ /* denominator = */ If(isNA, denominator, newDenominator) ::
+ /* result = */ If(isNA, result, newNumerator / newDenominator) :: Nil
+ } else {
+ /* numerator = */ If(isNA, numerator * beta, newNumerator) ::
+ /* denominator = */ If(isNA, denominator * beta, newDenominator) ::
+ /* result = */ If(isNA, result, newNumerator / newDenominator) :: Nil
+ }
}
- override val evaluateExpression: Expression = numerator / denominator
+ override val evaluateExpression: Expression = result
override def prettyName: String = "ewm"
- override def sql: String = s"$prettyName(${input.sql})"
+ override def sql: String = s"$prettyName(${input.sql}, $alpha, $ignoreNA)"
override def child: Expression = input
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 95a42331308..2cc595ed2bf 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -95,7 +95,8 @@ private[sql] object PythonSQLUtils extends Logging {
def castTimestampNTZToLong(c: Column): Column =
Column(CastTimestampNTZToLong(c.expr))
- def ewm(e: Column, alpha: Double): Column = Column(EWM(e.expr, alpha))
+ def ewm(e: Column, alpha: Double, ignoreNA: Boolean): Column =
+ Column(EWM(e.expr, alpha, ignoreNA))
def lastNonNull(e: Column): Column = Column(LastNonNull(e.expr))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]