This is an automated email from the ASF dual-hosted git repository.
HyukjinKwon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 922d2ac4a77e [SPARK-40340][SPARK-40343][PYTHON] Implement Rolling.sem
and Expanding.sem in pandas API on Spark
922d2ac4a77e is described below
commit 922d2ac4a77e5c544bcf0d318012a082c47d9505
Author: Devin Petersohn <[email protected]>
AuthorDate: Mon May 11 06:53:29 2026 +0900
[SPARK-40340][SPARK-40343][PYTHON] Implement Rolling.sem and Expanding.sem
in pandas API on Spark
### What changes were proposed in this pull request?
Implement `Rolling.sem()` and `Expanding.sem()` in the pandas API on Spark
### Why are the changes needed?
Missing API coverage
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
Closes #55774 from devin-petersohn/devin/window-sem.
Authored-by: Devin Petersohn <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../pandas/tests/window/test_expanding_adv.py | 4 +
.../pandas/tests/window/test_rolling_adv.py | 4 +
python/pyspark/pandas/window.py | 127 +++++++++++++++++++++
3 files changed, 135 insertions(+)
diff --git a/python/pyspark/pandas/tests/window/test_expanding_adv.py
b/python/pyspark/pandas/tests/window/test_expanding_adv.py
index 2a639773b088..dcb3b09f3d3e 100644
--- a/python/pyspark/pandas/tests/window/test_expanding_adv.py
+++ b/python/pyspark/pandas/tests/window/test_expanding_adv.py
@@ -35,6 +35,10 @@ class ExpandingAdvMixin(ExpandingTestingFuncMixin):
def test_expanding_var(self):
self._test_expanding_func("var", int_almost=True)
+ def test_expanding_sem(self):
+ self._test_expanding_func("sem", int_almost=True)
+ self._test_expanding_func(lambda x: x.sem(ddof=0), lambda x:
x.sem(ddof=0), int_almost=True)
+
def test_expanding_skew(self):
self._test_expanding_func("skew", int_almost=True)
diff --git a/python/pyspark/pandas/tests/window/test_rolling_adv.py
b/python/pyspark/pandas/tests/window/test_rolling_adv.py
index 983e164771fe..69c9515e6a1d 100644
--- a/python/pyspark/pandas/tests/window/test_rolling_adv.py
+++ b/python/pyspark/pandas/tests/window/test_rolling_adv.py
@@ -32,6 +32,10 @@ class RollingAdvMixin(RollingTestingFuncMixin):
def test_rolling_var(self):
self._test_rolling_func("var")
+ def test_rolling_sem(self):
+ self._test_rolling_func("sem")
+ self._test_rolling_func(lambda x: x.sem(ddof=0), lambda x:
x.sem(ddof=0))
+
def test_rolling_skew(self):
self._test_rolling_func("skew")
diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py
index 89733a0ca06c..486b76f2a04e 100644
--- a/python/pyspark/pandas/window.py
+++ b/python/pyspark/pandas/window.py
@@ -131,6 +131,19 @@ class RollingAndExpanding(Generic[FrameLike],
metaclass=ABCMeta):
return self._apply_as_series_or_frame(var)
+ def sem(self, ddof: int = 1) -> FrameLike:
+ if not isinstance(ddof, int):
+ raise TypeError("ddof must be integer")
+
+ def sem(scol: Column) -> Column:
+ count = F.count(scol).over(self._window)
+ return F.when(
+ (F.row_number().over(self._unbounded_window) >=
self._min_periods) & (count > ddof),
+ F.stddev(scol).over(self._window) / F.sqrt(count - ddof),
+ ).otherwise(F.lit(None))
+
+ return self._apply_as_series_or_frame(sem)
+
def skew(self) -> FrameLike:
def skew(scol: Column) -> Column:
return F.when(
@@ -835,6 +848,63 @@ class Rolling(RollingLike[FrameLike]):
"""
return super().var()
+ def sem(self, ddof: int = 1) -> FrameLike:
+ """
+ Calculate the rolling standard error of the mean.
+
+ .. versionadded:: 4.3.0
+
+ .. note:: the current implementation of this API uses Spark's Window
without
+ specifying partition specification. This leads to move all data
into
+ single partition in single machine and could cause serious
+ performance degradation. Avoid this method against very large
dataset.
+
+ Parameters
+ ----------
+ ddof : int, default 1
+ Delta Degrees of Freedom. The divisor used in calculations is N -
ddof,
+ where N represents the number of elements.
+
+ Returns
+ -------
+ Series or DataFrame
+ Returns the same object type as the caller of the rolling
calculation.
+
+ See Also
+ --------
+ pyspark.pandas.Series.rolling : Calling object with Series data.
+ pyspark.pandas.DataFrame.rolling : Calling object with DataFrames.
+ pyspark.pandas.Series.sem : Equivalent method for Series.
+ pyspark.pandas.DataFrame.sem : Equivalent method for DataFrame.
+
+ Examples
+ --------
+ >>> s = ps.Series([5, 5, 6, 7, 5, 5, 5])
+ >>> s.rolling(3).sem()
+ 0 NaN
+ 1 NaN
+ 2 0.408248
+ 3 0.707107
+ 4 0.707107
+ 5 0.816497
+ 6 0.000000
+ dtype: float64
+
+ For DataFrame, each rolling standard error of the mean is computed
column-wise.
+
+ >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2})
+ >>> df.rolling(2).sem()
+ A B
+ 0 NaN NaN
+ 1 0.000000 0.000000
+ 2 0.707107 7.778175
+ 3 0.707107 9.192388
+ 4 1.414214 16.970563
+ 5 0.000000 0.000000
+ 6 0.000000 0.000000
+ """
+ return super().sem(ddof)
+
def skew(self) -> FrameLike:
"""
Calculate unbiased rolling skew.
@@ -2007,6 +2077,63 @@ class Expanding(ExpandingLike[FrameLike]):
"""
return super().var()
+ def sem(self, ddof: int = 1) -> FrameLike:
+ """
+ Calculate the expanding standard error of the mean.
+
+ .. versionadded:: 4.3.0
+
+ .. note:: the current implementation of this API uses Spark's Window
without
+ specifying partition specification. This leads to move all data
into
+ single partition in single machine and could cause serious
+ performance degradation. Avoid this method against very large
dataset.
+
+ Parameters
+ ----------
+ ddof : int, default 1
+ Delta Degrees of Freedom. The divisor used in calculations is N -
ddof,
+ where N represents the number of elements.
+
+ Returns
+ -------
+ Series or DataFrame
+ Returns the same object type as the caller of the expanding
calculation.
+
+ See Also
+ --------
+ pyspark.pandas.Series.expanding : Calling object with Series data.
+ pyspark.pandas.DataFrame.expanding : Calling object with DataFrames.
+ pyspark.pandas.Series.sem : Equivalent method for Series.
+ pyspark.pandas.DataFrame.sem : Equivalent method for DataFrame.
+
+ Examples
+ --------
+ >>> s = ps.Series([5, 5, 6, 7, 5, 5, 5])
+ >>> s.expanding(3).sem()
+ 0 NaN
+ 1 NaN
+ 2 0.408248
+ 3 0.552771
+ 4 0.447214
+ 5 0.374166
+ 6 0.321208
+ dtype: float64
+
+ For DataFrame, each expanding standard error of the mean is computed
column-wise.
+
+ >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2})
+ >>> df.expanding(2).sem()
+ A B
+ 0 NaN NaN
+ 1 0.000000 0.000000
+ 2 0.408248 4.490731
+ 3 0.552771 6.589132
+ 4 0.447214 5.315073
+ 5 0.374166 4.439970
+ 6 0.321208 3.807887
+ """
+ return super().sem(ddof)
+
def skew(self) -> FrameLike:
"""
Calculate unbiased expanding skew.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]