This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 14b734ee918 [SPARK-39186][PYTHON][FOLLOWUP] Improve the numerical 
stability of pandas-on-Spark's skewness
14b734ee918 is described below

commit 14b734ee91800a1cd07cb211b505736019e1dfa2
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon May 16 09:23:52 2022 +0900

    [SPARK-39186][PYTHON][FOLLOWUP] Improve the numerical stability of 
pandas-on-Spark's skewness
    
    ### What changes were proposed in this pull request?
    
    Improve the numerical stability of skewness for cases with small `m2` and 
`m3`
    
    ### Why are the changes needed?
    
    the formulas to compute skewness were adjusted to be the same in 
https://github.com/apache/spark/pull/36549
    
    but for cases with small `m2` (like constant sequances), the results are 
still different.
    
    That is because `pandas.core.nanops.nanskew` will zero out small `m2` and 
`m3`
    ```
        m2 = _zero_out_fperr(m2)
        m3 = _zero_out_fperr(m3)
    ```
    and return the final result as
    ```
            result = 0 if m2 == 0 else result
            if count < 3:
                return np.nan
    ```
    
    This PR is to introduce this logic into Pandas API on Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    added UT
    
    Closes #36554 from zhengruifeng/impl_pandas_skew.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/generic.py                   | 18 +++++---------
 python/pyspark/pandas/tests/test_stats.py          | 14 +++++++++++
 .../expressions/aggregate/CentralMomentAgg.scala   | 29 ++++++++++++++++++++++
 .../spark/sql/api/python/PythonSQLUtils.scala      |  5 ++++
 4 files changed, 54 insertions(+), 12 deletions(-)

diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index cac983ad0c2..f5073315164 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -41,6 +41,7 @@ import numpy as np
 import pandas as pd
 from pandas.api.types import is_list_like  # type: ignore[attr-defined]
 
+from pyspark import SparkContext
 from pyspark.sql import Column, functions as F
 from pyspark.sql.types import (
     BooleanType,
@@ -1490,9 +1491,9 @@ class Frame(object, metaclass=ABCMeta):
 
         On a DataFrame:
 
-        >>> df.skew()  # doctest: +SKIP
-        a    0.000000e+00
-        b   -3.319678e-16
+        >>> df.skew()
+        a    0.0
+        b    0.0
         dtype: float64
 
         On a Series:
@@ -1517,15 +1518,8 @@ class Frame(object, metaclass=ABCMeta):
                     )
                 )
 
-            count_scol = F.count(F.when(~spark_column.isNull(), 
1).otherwise(None))
-            # refer to the Pandas implementation 'nanskew'
-            # 
https://github.com/pandas-dev/pandas/blob/main/pandas/core/nanops.py#L1152
-            return F.when(
-                count_scol > 2,
-                F.skewness(spark_column)
-                * F.sqrt(1 - 1 / count_scol)
-                * (count_scol / (count_scol - 2)),
-            ).otherwise(None)
+            sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
+            return Column(sql_utils.pandasSkewness(spark_column._jc))
 
         return self._reduce_for_stat_function(
             skew,
diff --git a/python/pyspark/pandas/tests/test_stats.py 
b/python/pyspark/pandas/tests/test_stats.py
index 89f5f755e12..ccce140a4ac 100644
--- a/python/pyspark/pandas/tests/test_stats.py
+++ b/python/pyspark/pandas/tests/test_stats.py
@@ -236,6 +236,20 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
                 pdf.sem(axis=1, ddof=0, numeric_only=True),
             )
 
+    def test_skew_numerical_stability(self):
+        pdf = pd.DataFrame(
+            {
+                "A": [1, 1, 1, 1, 1],
+                "B": [1.0, np.nan, 4, 2, 5],
+                "C": [-6.0, -7, np.nan, np.nan, 10],
+                "D": [1.2, np.nan, np.nan, 9.8, np.nan],
+                "E": [1, np.nan, np.nan, np.nan, np.nan],
+                "F": [np.nan, np.nan, np.nan, np.nan, np.nan],
+            }
+        )
+        psdf = ps.from_pandas(pdf)
+        self.assert_eq(psdf.skew(), pdf.skew(), almost=True)
+
     def test_corr(self):
         # Disable arrow execution since corr() is using UDT internally which 
is not supported.
         with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
index f9bfe77ce5a..2371cb5143a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
@@ -338,3 +338,32 @@ case class Kurtosis(
   override protected def withNewChildInternal(newChild: Expression): Kurtosis =
     copy(child = newChild)
 }
+
+/**
+ * Skewness in Pandas' fashion. This expression is dedicated only for Pandas 
API on Spark.
+ * Refer to pandas.core.nanops.nanskew.
+ */
+case class PandasSkewness(child: Expression)
+  extends CentralMomentAgg(child, true) {
+
+  override def prettyName: String = "pandas_skewness"
+
+  override protected def momentOrder = 3
+
+  override val evaluateExpression: Expression = {
+    // floating point error
+    //
+    // Pandas #18044 in _libs/windows.pyx calc_skew follow this behavior
+    // to fix the fperr to treat m2 <1e-14 as zero
+    //
+    // see https://github.com/pandas-dev/pandas/issues/18044 for details
+    val _m2 = If(abs(m2) < 1e-14, Literal(0.0), m2)
+    val _m3 = If(abs(m3) < 1e-14, Literal(0.0), m3)
+
+    If(n < 3, Literal.create(null, DoubleType),
+      If(_m2 === 0.0, Literal(0.0), sqrt(n - 1) * (n / (n - 2)) * _m3 / 
sqrt(_m2 * _m2 * _m2)))
+  }
+
+  override protected def withNewChildInternal(newChild: Expression): 
PandasSkewness =
+    copy(child = newChild)
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 19cfbd0bbe7..a6307922dee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.{Column, DataFrame, Row, 
SparkSession}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.PandasSkewness
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.execution.{ExplainMode, QueryExecution}
 import org.apache.spark.sql.execution.arrow.ArrowConverters
@@ -122,6 +123,10 @@ private[sql] object PythonSQLUtils extends Logging {
   def timestampDiff(unit: String, start: Column, end: Column): Column = {
     Column(TimestampDiff(unit, start.expr, end.expr))
   }
+
+  def pandasSkewness(e: Column): Column = {
+    Column(PandasSkewness(e.expr).toAggregateExpression(false))
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to