This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new def8cca7d89 [SPARK-40543][PS][SQL] Make `ddof` in `DataFrame.var` and
`Series.var` accept arbitary integers
def8cca7d89 is described below
commit def8cca7d891b95e56f60ea3c007398d3e21bcd1
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Sep 23 14:54:12 2022 +0800
[SPARK-40543][PS][SQL] Make `ddof` in `DataFrame.var` and `Series.var`
accept arbitary integers
### What changes were proposed in this pull request?
add a new `var` expression to support arbitary integeral `ddof`
### Why are the changes needed?
for API coverage
### Does this PR introduce _any_ user-facing change?
yes, it accept `ddof` other than {0, 1}
before
```
In [1]: import pyspark.pandas as ps
In [2]: import numpy as np
In [3]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3,
np.nan]}, columns=['a', 'b'])
In [4]: df.var(ddof=2)
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
Cell In [4], line 1
----> 1 df.var(ddof=2)
File ~/Dev/spark/python/pyspark/pandas/generic.py:1958, in Frame.var(self,
axis, ddof, numeric_only)
1904 def var(
1905 self, axis: Optional[Axis] = None, ddof: int = 1, numeric_only:
bool = None
1906 ) -> Union[Scalar, "Series"]:
1907 """
1908 Return unbiased variance.
1909
(...)
1956 0.6666666666666666
1957 """
-> 1958 assert ddof in (0, 1)
1960 axis = validate_axis(axis)
1962 if numeric_only is None and axis == 0:
AssertionError:
```
after
```
In [4]: df.var(ddof=2)
Out[4]:
a 2.00
b 0.02
dtype: float64
In [5]: df.to_pandas().var(ddof=2)
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975:
PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's
memory. It should only be used if the resulting pandas DataFrame is expected to
be small.
warnings.warn(message, PandasAPIOnSparkAdviceWarning)
Out[5]:
a 2.00
b 0.02
dtype: float64
```
### How was this patch tested?
added UT
Closes #37975 from zhengruifeng/ps_var_ddof.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/pandas/generic.py | 21 ++++++++++++++++-----
python/pyspark/pandas/spark/functions.py | 5 +++++
.../pyspark/pandas/tests/test_generic_functions.py | 7 +++++++
python/pyspark/pandas/tests/test_stats.py | 6 ++++++
.../expressions/aggregate/CentralMomentAgg.scala | 22 ++++++++++++++++++++++
.../spark/sql/api/python/PythonSQLUtils.scala | 4 ++++
6 files changed, 60 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index 6ba967da7f5..49032f07c75 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -1907,6 +1907,8 @@ class Frame(object, metaclass=ABCMeta):
"""
Return unbiased variance.
+ .. versionadded:: 3.3.0
+
Parameters
----------
axis : {index (0), columns (1)}
@@ -1914,6 +1916,9 @@ class Frame(object, metaclass=ABCMeta):
ddof : int, default 1
Delta Degrees of Freedom. The divisor used in calculations is N -
ddof,
where N represents the number of elements.
+
+ .. versionchanged:: 3.4.0
+ Supported including arbitary integers.
numeric_only : bool, default None
Include only float, int, boolean columns. False is not supported.
This parameter
is mainly for pandas compatibility.
@@ -1935,6 +1940,11 @@ class Frame(object, metaclass=ABCMeta):
b 0.01
dtype: float64
+ >>> df.var(ddof=2)
+ a 2.00
+ b 0.02
+ dtype: float64
+
>>> df.var(axis=1)
0 0.405
1 1.620
@@ -1954,8 +1964,12 @@ class Frame(object, metaclass=ABCMeta):
>>> df['a'].var(ddof=0)
0.6666666666666666
+
+ >>> df['a'].var(ddof=-2)
+ 0.4
"""
- assert ddof in (0, 1)
+ if not isinstance(ddof, int):
+ raise TypeError("ddof must be integer")
axis = validate_axis(axis)
@@ -1973,10 +1987,7 @@ class Frame(object, metaclass=ABCMeta):
spark_type_to_pandas_dtype(spark_type),
spark_type.simpleString()
)
)
- if ddof == 0:
- return F.var_pop(spark_column)
- else:
- return F.var_samp(spark_column)
+ return SF.var(spark_column, ddof)
return self._reduce_for_stat_function(
var, name="var", axis=axis, numeric_only=numeric_only, ddof=ddof
diff --git a/python/pyspark/pandas/spark/functions.py
b/python/pyspark/pandas/spark/functions.py
index 3aa9d9a37dd..f9311296a57 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -32,6 +32,11 @@ def stddev(col: Column, ddof: int) -> Column:
return Column(sc._jvm.PythonSQLUtils.pandasStddev(col._jc, ddof))
+def var(col: Column, ddof: int) -> Column:
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.PythonSQLUtils.pandasVariance(col._jc, ddof))
+
+
def skew(col: Column) -> Column:
sc = SparkContext._active_spark_context
return Column(sc._jvm.PythonSQLUtils.pandasSkewness(col._jc))
diff --git a/python/pyspark/pandas/tests/test_generic_functions.py
b/python/pyspark/pandas/tests/test_generic_functions.py
index c8f6dc275da..7c252c8356d 100644
--- a/python/pyspark/pandas/tests/test_generic_functions.py
+++ b/python/pyspark/pandas/tests/test_generic_functions.py
@@ -167,6 +167,8 @@ class GenericFunctionsTest(PandasOnSparkTestCase,
TestUtils):
self._test_stat_functions(lambda x: x.std())
self._test_stat_functions(lambda x: x.std(skipna=False))
self._test_stat_functions(lambda x: x.std(ddof=2))
+ self._test_stat_functions(lambda x: x.var())
+ self._test_stat_functions(lambda x: x.var(ddof=2))
self._test_stat_functions(lambda x: x.sem())
self._test_stat_functions(lambda x: x.sem(skipna=False))
# self._test_stat_functions(lambda x: x.skew())
@@ -181,6 +183,11 @@ class GenericFunctionsTest(PandasOnSparkTestCase,
TestUtils):
with self.assertRaisesRegex(TypeError, "ddof must be integer"):
psdf.a.std(ddof="ddof")
+ with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+ psdf.var(ddof="ddof")
+ with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+ psdf.a.var(ddof="ddof")
+
self.assert_eq(pdf.a.median(), psdf.a.median())
self.assert_eq(pdf.a.median(skipna=False), psdf.a.median(skipna=False))
self.assert_eq(1.0, psdf.b.median())
diff --git a/python/pyspark/pandas/tests/test_stats.py
b/python/pyspark/pandas/tests/test_stats.py
index 7a6a0d67494..ae3f457b799 100644
--- a/python/pyspark/pandas/tests/test_stats.py
+++ b/python/pyspark/pandas/tests/test_stats.py
@@ -448,6 +448,7 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
self.assert_eq(psser.var(), pser.var(), almost=True)
self.assert_eq(psser.var(ddof=0), pser.var(ddof=0), almost=True)
+ self.assert_eq(psser.var(ddof=2), pser.var(ddof=2), almost=True)
self.assert_eq(psser.std(), pser.std(), almost=True)
self.assert_eq(psser.std(ddof=0), pser.std(ddof=0), almost=True)
self.assert_eq(psser.std(ddof=2), pser.std(ddof=2), almost=True)
@@ -483,6 +484,11 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
pdf.var(ddof=0, numeric_only=True),
check_exact=False,
)
+ self.assert_eq(
+ psdf.var(ddof=2, numeric_only=True),
+ pdf.var(ddof=2, numeric_only=True),
+ check_exact=False,
+ )
self.assert_eq(psdf.std(numeric_only=True),
pdf.std(numeric_only=True), check_exact=False)
self.assert_eq(
psdf.std(ddof=0, numeric_only=True),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
index b830dd6c088..133a39d9874 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
@@ -362,6 +362,28 @@ case class PandasStddev(
copy(child = newChild)
}
+/**
+ * Variance in Pandas' fashion. This expression is dedicated only for Pandas
API on Spark.
+ * Refer to pandas.core.nanops.nanvar.
+ */
+case class PandasVariance(
+ child: Expression,
+ ddof: Int)
+ extends CentralMomentAgg(child, true) {
+
+ override protected def momentOrder = 2
+
+ override val evaluateExpression: Expression = {
+ If(n === 0.0, Literal.create(null, DoubleType),
+ If(n === ddof, divideByZeroEvalResult, m2 / (n - ddof)))
+ }
+
+ override def prettyName: String = "pandas_variance"
+
+ override protected def withNewChildInternal(newChild: Expression):
PandasVariance =
+ copy(child = newChild)
+}
+
/**
* Skewness in Pandas' fashion. This expression is dedicated only for Pandas
API on Spark.
* Refer to pandas.core.nanops.nanskew.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 579a945398f..d43a9060677 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -159,6 +159,10 @@ private[sql] object PythonSQLUtils extends Logging {
Column(PandasStddev(e.expr, ddof).toAggregateExpression(false))
}
+ def pandasVariance(e: Column, ddof: Int): Column = {
+ Column(PandasVariance(e.expr, ddof).toAggregateExpression(false))
+ }
+
def pandasSkewness(e: Column): Column = {
Column(PandasSkewness(e.expr).toAggregateExpression(false))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]