This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 14b734ee918 [SPARK-39186][PYTHON][FOLLOWUP] Improve the numerical
stability of pandas-on-Spark's skewness
14b734ee918 is described below
commit 14b734ee91800a1cd07cb211b505736019e1dfa2
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon May 16 09:23:52 2022 +0900
[SPARK-39186][PYTHON][FOLLOWUP] Improve the numerical stability of
pandas-on-Spark's skewness
### What changes were proposed in this pull request?
Improve the numerical stability of skewness for cases with small `m2` and
`m3`
### Why are the changes needed?
the formulas to compute skewness were adjusted to be the same in
https://github.com/apache/spark/pull/36549
but for cases with small `m2` (like constant sequances), the results are
still different.
That is because `pandas.core.nanops.nanskew` will zero out small `m2` and
`m3`
```
m2 = _zero_out_fperr(m2)
m3 = _zero_out_fperr(m3)
```
and return the final result as
```
result = 0 if m2 == 0 else result
if count < 3:
return np.nan
```
This PR is to introduce this logic into Pandas API on Spark.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
added UT
Closes #36554 from zhengruifeng/impl_pandas_skew.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/generic.py | 18 +++++---------
python/pyspark/pandas/tests/test_stats.py | 14 +++++++++++
.../expressions/aggregate/CentralMomentAgg.scala | 29 ++++++++++++++++++++++
.../spark/sql/api/python/PythonSQLUtils.scala | 5 ++++
4 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index cac983ad0c2..f5073315164 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -41,6 +41,7 @@ import numpy as np
import pandas as pd
from pandas.api.types import is_list_like # type: ignore[attr-defined]
+from pyspark import SparkContext
from pyspark.sql import Column, functions as F
from pyspark.sql.types import (
BooleanType,
@@ -1490,9 +1491,9 @@ class Frame(object, metaclass=ABCMeta):
On a DataFrame:
- >>> df.skew() # doctest: +SKIP
- a 0.000000e+00
- b -3.319678e-16
+ >>> df.skew()
+ a 0.0
+ b 0.0
dtype: float64
On a Series:
@@ -1517,15 +1518,8 @@ class Frame(object, metaclass=ABCMeta):
)
)
- count_scol = F.count(F.when(~spark_column.isNull(),
1).otherwise(None))
- # refer to the Pandas implementation 'nanskew'
- #
https://github.com/pandas-dev/pandas/blob/main/pandas/core/nanops.py#L1152
- return F.when(
- count_scol > 2,
- F.skewness(spark_column)
- * F.sqrt(1 - 1 / count_scol)
- * (count_scol / (count_scol - 2)),
- ).otherwise(None)
+ sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
+ return Column(sql_utils.pandasSkewness(spark_column._jc))
return self._reduce_for_stat_function(
skew,
diff --git a/python/pyspark/pandas/tests/test_stats.py
b/python/pyspark/pandas/tests/test_stats.py
index 89f5f755e12..ccce140a4ac 100644
--- a/python/pyspark/pandas/tests/test_stats.py
+++ b/python/pyspark/pandas/tests/test_stats.py
@@ -236,6 +236,20 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
pdf.sem(axis=1, ddof=0, numeric_only=True),
)
+ def test_skew_numerical_stability(self):
+ pdf = pd.DataFrame(
+ {
+ "A": [1, 1, 1, 1, 1],
+ "B": [1.0, np.nan, 4, 2, 5],
+ "C": [-6.0, -7, np.nan, np.nan, 10],
+ "D": [1.2, np.nan, np.nan, 9.8, np.nan],
+ "E": [1, np.nan, np.nan, np.nan, np.nan],
+ "F": [np.nan, np.nan, np.nan, np.nan, np.nan],
+ }
+ )
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(psdf.skew(), pdf.skew(), almost=True)
+
def test_corr(self):
# Disable arrow execution since corr() is using UDT internally which
is not supported.
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
index f9bfe77ce5a..2371cb5143a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
@@ -338,3 +338,32 @@ case class Kurtosis(
override protected def withNewChildInternal(newChild: Expression): Kurtosis =
copy(child = newChild)
}
+
+/**
+ * Skewness in Pandas' fashion. This expression is dedicated only for Pandas
API on Spark.
+ * Refer to pandas.core.nanops.nanskew.
+ */
+case class PandasSkewness(child: Expression)
+ extends CentralMomentAgg(child, true) {
+
+ override def prettyName: String = "pandas_skewness"
+
+ override protected def momentOrder = 3
+
+ override val evaluateExpression: Expression = {
+ // floating point error
+ //
+ // Pandas #18044 in _libs/windows.pyx calc_skew follow this behavior
+ // to fix the fperr to treat m2 <1e-14 as zero
+ //
+ // see https://github.com/pandas-dev/pandas/issues/18044 for details
+ val _m2 = If(abs(m2) < 1e-14, Literal(0.0), m2)
+ val _m3 = If(abs(m3) < 1e-14, Literal(0.0), m3)
+
+ If(n < 3, Literal.create(null, DoubleType),
+ If(_m2 === 0.0, Literal(0.0), sqrt(n - 1) * (n / (n - 2)) * _m3 /
sqrt(_m2 * _m2 * _m2)))
+ }
+
+ override protected def withNewChildInternal(newChild: Expression):
PandasSkewness =
+ copy(child = newChild)
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 19cfbd0bbe7..a6307922dee 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.{Column, DataFrame, Row,
SparkSession}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.PandasSkewness
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.{ExplainMode, QueryExecution}
import org.apache.spark.sql.execution.arrow.ArrowConverters
@@ -122,6 +123,10 @@ private[sql] object PythonSQLUtils extends Logging {
def timestampDiff(unit: String, start: Column, end: Column): Column = {
Column(TimestampDiff(unit, start.expr, end.expr))
}
+
+ def pandasSkewness(e: Column): Column = {
+ Column(PandasSkewness(e.expr).toAggregateExpression(false))
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]