This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 398bff77b1c [SPARK-45999][PS] Use dedicated `PandasProduct` in
`cumprod`
398bff77b1c is described below
commit 398bff77b1c837aed55f4f10ff1157f7da813570
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Nov 20 16:55:09 2023 +0800
[SPARK-45999][PS] Use dedicated `PandasProduct` in `cumprod`
### What changes were proposed in this pull request?
Use dedicated `PandasProduct` in `cumprod`
### Why are the changes needed?
to be consistent with the `prod`
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #43901 from zhengruifeng/ps_cumprod.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/pandas/generic.py | 2 --
python/pyspark/pandas/series.py | 34 +++++++---------------------------
2 files changed, 7 insertions(+), 29 deletions(-)
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index 23139762882..77cefb53fe5 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -331,8 +331,6 @@ class Frame(object, metaclass=ABCMeta):
return self._apply_series_op(lambda psser: psser._cumsum(skipna),
should_resolve=True)
# TODO: add 'axis' parameter
- # TODO: use pandas_udf to support negative values and other options later
- # other window except unbounded ones is supported as of Spark 3.0.
def cumprod(self: FrameLike, skipna: bool = True) -> FrameLike:
"""
Return cumulative product over a DataFrame or Series axis.
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index c9beb6432f9..fedc2417a29 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -62,7 +62,6 @@ from pyspark.sql.types import (
DoubleType,
FloatType,
IntegerType,
- IntegralType,
LongType,
NumericType,
Row,
@@ -6973,36 +6972,17 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
return psser._cum(F.sum, skipna, part_cols)
def _cumprod(self, skipna: bool, part_cols: Sequence["ColumnOrName"] = ())
-> "Series":
- if isinstance(self.spark.data_type, BooleanType):
- scol = self._cum(
- lambda scol: F.min(F.coalesce(scol, F.lit(True))), skipna,
part_cols
- ).spark.column.cast(LongType())
- elif isinstance(self.spark.data_type, NumericType):
- num_zeros = self._cum(
- lambda scol: F.sum(F.when(scol == 0, 1).otherwise(0)), skipna,
part_cols
- ).spark.column
- num_negatives = self._cum(
- lambda scol: F.sum(F.when(scol < 0, 1).otherwise(0)), skipna,
part_cols
- ).spark.column
- sign = F.when(num_negatives % 2 == 0, 1).otherwise(-1)
-
- abs_prod = F.exp(
- self._cum(lambda scol: F.sum(F.log(F.abs(scol))), skipna,
part_cols).spark.column
- )
-
- scol = F.when(num_zeros > 0, 0).otherwise(sign * abs_prod)
-
- if isinstance(self.spark.data_type, IntegralType):
- scol = F.round(scol).cast(LongType())
- else:
+ psser = self
+ if isinstance(psser.spark.data_type, BooleanType):
+ psser = psser.spark.transform(lambda scol: scol.cast(LongType()))
+ elif not isinstance(psser.spark.data_type, NumericType):
raise TypeError(
"Could not convert {} ({}) to numeric".format(
- spark_type_to_pandas_dtype(self.spark.data_type),
- self.spark.data_type.simpleString(),
+ spark_type_to_pandas_dtype(psser.spark.data_type),
+ psser.spark.data_type.simpleString(),
)
)
-
- return self._with_new_scol(scol)
+ return psser._cum(lambda c: SF.product(c, skipna), skipna, part_cols)
# ----------------------------------------------------------------------
# Accessor Methods
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]