This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e3256b8 [SPARK-36396][PYTHON] Implement DataFrame.cov
e3256b8 is described below
commit e3256b838b5bd5c817bc95ba9d996b878078ad35
Author: dch nguyen <[email protected]>
AuthorDate: Tue Nov 30 15:46:11 2021 +0900
[SPARK-36396][PYTHON] Implement DataFrame.cov
### What changes were proposed in this pull request?
Implement DataFrame.cov
### Why are the changes needed?
Increase pandas API coverage in PySpark
### Does this PR introduce _any_ user-facing change?
User can use
``` python
>>> psdf = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
... columns=['dogs', 'cats'])
>>> psdf.cov()
dogs cats
dogs 0.666667 -1.000000
cats -1.000000 1.666667
>>> pdf = pd.DataFrame(
... {
... "a": [1, np.nan, 3, 4],
... "b": [True, False, False, True],
... "c": [True, True, False, True],
... }
... )
>>> psdf = ps.from_pandas(pdf)
>>> psdf.cov()
a b c
a 2.333333 -0.166667 -0.166667
b -0.166667 0.333333 0.166667
c -0.166667 0.166667 0.250000
```
### How was this patch tested?
unit tests
Closes #34213 from dchvn/SPARK-36396.
Authored-by: dch nguyen <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../docs/source/reference/pyspark.pandas/frame.rst | 1 +
python/pyspark/pandas/frame.py | 189 +++++++++++++++++++++
python/pyspark/pandas/missing/frame.py | 1 -
python/pyspark/pandas/tests/test_dataframe.py | 65 ++++++-
4 files changed, 254 insertions(+), 2 deletions(-)
diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst
b/python/docs/source/reference/pyspark.pandas/frame.rst
index bb84202..04bfe27 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -148,6 +148,7 @@ Computations / Descriptive Stats
DataFrame.clip
DataFrame.corr
DataFrame.count
+ DataFrame.cov
DataFrame.describe
DataFrame.kurt
DataFrame.kurtosis
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 38ac9af..edfb62e 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -77,6 +77,7 @@ from pyspark.sql.types import (
StringType,
StructField,
StructType,
+ DecimalType,
)
from pyspark.sql.window import Window
@@ -8258,6 +8259,194 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
self._update_internal_frame(internal, requires_same_anchor=False)
+ # TODO: ddof should be implemented.
+ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
+ """
+ Compute pairwise covariance of columns, excluding NA/null values.
+
+ Compute the pairwise covariance among the series of a DataFrame.
+ The returned data frame is the `covariance matrix
+ <https://en.wikipedia.org/wiki/Covariance_matrix>`__ of the columns
+ of the DataFrame.
+
+ Both NA and null values are automatically excluded from the
+ calculation. (See the note below about bias from missing values.)
+ A threshold can be set for the minimum number of
+ observations for each value created. Comparisons with observations
+ below this threshold will be returned as ``NaN``.
+
+ This method is generally used for the analysis of time series data to
+ understand the relationship between different measures
+ across time.
+
+ .. versionadded:: 3.3.0
+
+ Parameters
+ ----------
+ min_periods : int, optional
+ Minimum number of observations required per pair of columns
+ to have a valid result.
+
+ Returns
+ -------
+ DataFrame
+ The covariance matrix of the series of the DataFrame.
+
+ See Also
+ --------
+ Series.cov : Compute covariance with another Series.
+
+ Examples
+ --------
+ >>> df = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
+ ... columns=['dogs', 'cats'])
+ >>> df.cov()
+ dogs cats
+ dogs 0.666667 -1.000000
+ cats -1.000000 1.666667
+
+ >>> np.random.seed(42)
+ >>> df = ps.DataFrame(np.random.randn(1000, 5),
+ ... columns=['a', 'b', 'c', 'd', 'e'])
+ >>> df.cov()
+ a b c d e
+ a 0.998438 -0.020161 0.059277 -0.008943 0.014144
+ b -0.020161 1.059352 -0.008543 -0.024738 0.009826
+ c 0.059277 -0.008543 1.010670 -0.001486 -0.000271
+ d -0.008943 -0.024738 -0.001486 0.921297 -0.013692
+ e 0.014144 0.009826 -0.000271 -0.013692 0.977795
+
+ **Minimum number of periods**
+
+ This method also supports an optional ``min_periods`` keyword
+ that specifies the required minimum number of non-NA observations for
+ each column pair in order to have a valid result:
+
+ >>> np.random.seed(42)
+ >>> df = pd.DataFrame(np.random.randn(20, 3),
+ ... columns=['a', 'b', 'c'])
+ >>> df.loc[df.index[:5], 'a'] = np.nan
+ >>> df.loc[df.index[5:10], 'b'] = np.nan
+ >>> sdf = ps.from_pandas(df)
+ >>> sdf.cov(min_periods=12)
+ a b c
+ a 0.316741 NaN -0.150812
+ b NaN 1.248003 0.191417
+ c -0.150812 0.191417 0.895202
+ """
+ min_periods = 1 if min_periods is None else min_periods
+
+ # Only compute covariance for Boolean and Numeric except Decimal
+ psdf = self[
+ [
+ col
+ for col in self.columns
+ if isinstance(self[col].spark.data_type, BooleanType)
+ or (
+ isinstance(self[col].spark.data_type, NumericType)
+ and not isinstance(self[col].spark.data_type, DecimalType)
+ )
+ ]
+ ]
+
+ num_cols = len(psdf.columns)
+ cov = np.zeros([num_cols, num_cols])
+
+ if num_cols == 0:
+ return DataFrame()
+
+ if len(psdf) < min_periods:
+ cov.fill(np.nan)
+ return DataFrame(cov, columns=psdf.columns, index=psdf.columns)
+
+ data_cols = psdf._internal.data_spark_column_names
+ cov_scols = []
+ count_not_null_scols = []
+
+ # Count number of null row between two columns
+ # Example:
+ # a b c
+ # 0 1 1 1
+ # 1 NaN 2 2
+ # 2 3 NaN 3
+ # 3 4 4 4
+ #
+ # a b c
+ # a count(a, a) count(a, b) count(a, c)
+ # b count(b, b) count(b, c)
+ # c count(c, c)
+ #
+ # count_not_null_scols =
+ # [F.count(a, a), F.count(a, b), F.count(a, c), F.count(b, b),
F.count(b, c), F.count(c, c)]
+ for r in range(0, num_cols):
+ for c in range(r, num_cols):
+ count_not_null_scols.append(
+ F.count(
+ F.when(F.col(data_cols[r]).isNotNull() &
F.col(data_cols[c]).isNotNull(), 1)
+ )
+ )
+
+ count_not_null = (
+ psdf._internal.spark_frame.replace(float("nan"), None)
+ .select(*count_not_null_scols)
+ .head(1)[0]
+ )
+
+ # Calculate covariance between two columns
+ # Example:
+ # with min_periods = 3
+ # a b c
+ # 0 1 1 1
+ # 1 NaN 2 2
+ # 2 3 NaN 3
+ # 3 4 4 4
+ #
+ # a b c
+ # a cov(a, a) None cov(a, c)
+ # b cov(b, b) cov(b, c)
+ # c cov(c, c)
+ #
+ # cov_scols = [F.cov(a, a), None, F.cov(a, c), F.cov(b, b), F.cov(b,
c), F.cov(c, c)]
+ step = 0
+ for r in range(0, num_cols):
+ step += r
+ for c in range(r, num_cols):
+ cov_scols.append(
+ F.covar_samp(
+ F.col(data_cols[r]).cast("double"),
F.col(data_cols[c]).cast("double")
+ )
+ if count_not_null[r * num_cols + c - step] >= min_periods
+ else F.lit(None)
+ )
+
+ pair_cov = psdf._internal.spark_frame.select(*cov_scols).head(1)[0]
+
+ # Convert from row to 2D array
+ # Example:
+ # pair_cov = [cov(a, a), None, cov(a, c), cov(b, b), cov(b, c), cov(c,
c)]
+ #
+ # cov =
+ #
+ # a b c
+ # a cov(a, a) None cov(a, c)
+ # b cov(b, b) cov(b, c)
+ # c cov(c, c)
+ step = 0
+ for r in range(0, num_cols):
+ step += r
+ for c in range(r, num_cols):
+ cov[r][c] = pair_cov[r * num_cols + c - step]
+
+ # Copy values
+ # Example:
+ # cov =
+ # a b c
+ # a cov(a, a) None cov(a, c)
+ # b None cov(b, b) cov(b, c)
+ # c cov(a, c) cov(b, c) cov(c, c)
+ cov = cov + cov.T - np.diag(np.diag(cov))
+ return DataFrame(cov, columns=psdf.columns, index=psdf.columns)
+
def sample(
self,
n: Optional[int] = None,
diff --git a/python/pyspark/pandas/missing/frame.py
b/python/pyspark/pandas/missing/frame.py
index aabc0e0..d822c14 100644
--- a/python/pyspark/pandas/missing/frame.py
+++ b/python/pyspark/pandas/missing/frame.py
@@ -39,7 +39,6 @@ class _MissingPandasLikeDataFrame(object):
compare = _unsupported_function("compare")
convert_dtypes = _unsupported_function("convert_dtypes")
corrwith = _unsupported_function("corrwith")
- cov = _unsupported_function("cov")
ewm = _unsupported_function("ewm")
infer_objects = _unsupported_function("infer_objects")
interpolate = _unsupported_function("interpolate")
diff --git a/python/pyspark/pandas/tests/test_dataframe.py
b/python/pyspark/pandas/tests/test_dataframe.py
index 701052e..ae8fcae 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
+import decimal
from datetime import datetime
from distutils.version import LooseVersion
import inspect
@@ -6025,6 +6025,69 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
)
self.assert_eq(psmidx.dtypes, expected)
+ def test_cov(self):
+ # SPARK-36396: Implement DataFrame.cov
+
+ # int
+ pdf = pd.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], columns=["a",
"b"])
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+ self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4),
almost=True)
+ self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
+
+ # bool
+ pdf = pd.DataFrame(
+ {
+ "a": [1, np.nan, 3, 4],
+ "b": [True, False, False, True],
+ "c": [True, True, False, True],
+ }
+ )
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+ self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4),
almost=True)
+ self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
+
+ # extension dtype
+ numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "Float32",
"Float64", "float"]
+ boolean_dtypes = ["boolean", "bool"]
+
+ sers = [pd.Series([1, 2, 3, None], dtype=dtype) for dtype in
numeric_dtypes]
+ sers += [pd.Series([True, False, True, None], dtype=dtype) for dtype
in boolean_dtypes]
+ sers.append(pd.Series([decimal.Decimal(1), decimal.Decimal(2),
decimal.Decimal(3), None]))
+
+ pdf = pd.concat(sers, axis=1)
+ pdf.columns = [dtype for dtype in numeric_dtypes + boolean_dtypes] +
["decimal"]
+ psdf = ps.from_pandas(pdf)
+
+ self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+ self.assert_eq(pdf.cov(min_periods=3), psdf.cov(min_periods=3),
almost=True)
+ self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4))
+
+ # string column
+ pdf = pd.DataFrame(
+ [(1, 2, "a", 1), (0, 3, "b", 1), (2, 0, "c", 9), (1, 1, "d", 1)],
+ columns=["a", "b", "c", "d"],
+ )
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+ self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4),
almost=True)
+ self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
+
+ # nan
+ np.random.seed(42)
+ pdf = pd.DataFrame(np.random.randn(20, 3), columns=["a", "b", "c"])
+ pdf.loc[pdf.index[:5], "a"] = np.nan
+ pdf.loc[pdf.index[5:10], "b"] = np.nan
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(pdf.cov(min_periods=11), psdf.cov(min_periods=11),
almost=True)
+ self.assert_eq(pdf.cov(min_periods=10), psdf.cov(min_periods=10),
almost=True)
+
+ # return empty DataFrame
+ pdf = pd.DataFrame([("1", "2"), ("0", "3"), ("2", "0"), ("1", "1")],
columns=["a", "b"])
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(pdf.cov(), psdf.cov())
+
if __name__ == "__main__":
from pyspark.pandas.tests.test_dataframe import * # noqa: F401
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]