This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3bbad5c417a [SPARK-38947][PYTHON][PS] Supports groupby positional
indexing
3bbad5c417a is described below
commit 3bbad5c417a8f819fa6e4838543ed8e63b4a6e75
Author: Yikun Jiang <[email protected]>
AuthorDate: Thu May 19 19:02:43 2022 +0900
[SPARK-38947][PYTHON][PS] Supports groupby positional indexing
### What changes were proposed in this pull request?
Add groupby positional indexing support for Pandas on Spark.
### Why are the changes needed?
Pandas supports Groupby positional indexing since v1.4.0
https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing
Before 1.4.0, `pdf.groupby("a").head(-2) == pdf.groupby("a").head(0)`
return empty dataframe, after 1.4.0, it allows to specify positional ranges
relative to the ends of each group.
```python
df = pd.DataFrame([["g", "g0"], ["g", "g1"], ["g", "g2"], ["g", "g3"],
["h", "h0"], ["h", "h1"]], columns=["A", "B"])
df.groupby("A").head(-1)
A B
0 g g0
1 g g1
2 g g2
4 h h0
```
### Does this PR introduce _any_ user-facing change?
Yes, follow pandas 1.4+ behaviors, still keep the behavor when using pandas
< 1.4
### How was this patch tested?
- Added some more postional indexing test (-1, -100000).
- `test_tail` and `test_head` pass with pandas 1.3.x and 1.4.x
Closes #36464 from Yikun/SPARK-38947.
Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/groupby.py | 90 +++++++++++--
python/pyspark/pandas/tests/test_groupby.py | 188 ++++++++--------------------
2 files changed, 133 insertions(+), 145 deletions(-)
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 9a931f20fd4..2fc237031cd 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -2122,22 +2122,60 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
groupkey_scols = [psdf._internal.spark_column_for(label) for label in
groupkey_labels]
sdf = psdf._internal.spark_frame
- tmp_col = verify_temp_column_name(sdf, "__row_number__")
+ window = Window.partitionBy(*groupkey_scols)
# This part is handled differently depending on whether it is a tail
or a head.
- window = (
-
Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc())
+ ordered_window = (
+ window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc())
if asc
- else Window.partitionBy(*groupkey_scols).orderBy(
- F.col(NATURAL_ORDER_COLUMN_NAME).desc()
- )
+ else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc())
)
- sdf = (
- sdf.withColumn(tmp_col, F.row_number().over(window))
- .filter(F.col(tmp_col) <= n)
- .drop(tmp_col)
- )
+ if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"):
+ tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__")
+ sdf = (
+ sdf.withColumn(tmp_row_num_col,
F.row_number().over(ordered_window))
+ .filter(F.col(tmp_row_num_col) <= n)
+ .drop(tmp_row_num_col)
+ )
+ else:
+ # Pandas supports Groupby positional indexing since v1.4.0
+ #
https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing
+ #
+ # To support groupby positional indexing, we need add a
`__tmp_lag__` column to help
+ # us filtering rows before the specified offset row.
+ #
+ # For example for the dataframe:
+ # >>> df = ps.DataFrame([["g", "g0"],
+ # ... ["g", "g1"],
+ # ... ["g", "g2"],
+ # ... ["g", "g3"],
+ # ... ["h", "h0"],
+ # ... ["h", "h1"]], columns=["A", "B"])
+ # >>> df.groupby("A").head(-1)
+ #
+ # Below is a result to show the `__tmp_lag__` column for above df,
the limit n is
+ # `-1`, the `__tmp_lag__` will be set to `0` in rows[:-1], and
left will be set to
+ # `null`:
+ #
+ # >>> sdf.withColumn(tmp_lag_col, F.lag(F.lit(0),
-1).over(ordered_window))
+ #
+-----------------+--------------+---+---+-----------------+-----------+
+ # |__index_level_0__|__groupkey_0__| A|
B|__natural_order__|__tmp_lag__|
+ #
+-----------------+--------------+---+---+-----------------+-----------+
+ # | 0| g| g| g0| 0|
0|
+ # | 1| g| g| g1| 8589934592|
0|
+ # | 2| g| g| g2| 17179869184|
0|
+ # | 3| g| g| g3| 25769803776|
null|
+ # | 4| h| h| h0| 34359738368|
0|
+ # | 5| h| h| h1| 42949672960|
null|
+ #
+-----------------+--------------+---+---+-----------------+-----------+
+ #
+ tmp_lag_col = verify_temp_column_name(sdf, "__tmp_lag__")
+ sdf = (
+ sdf.withColumn(tmp_lag_col, F.lag(F.lit(0),
n).over(ordered_window))
+ .where(~F.isnull(F.col(tmp_lag_col)))
+ .drop(tmp_lag_col)
+ )
internal = psdf._internal.with_new_sdf(sdf)
return
self._cleanup_and_return(DataFrame(internal).drop(groupkey_labels, axis=1))
@@ -2187,6 +2225,21 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
7 2
10 10
Name: b, dtype: int64
+
+ Supports Groupby positional indexing Since pandas on Spark 3.4 (with
pandas 1.4+):
+
+ >>> df = ps.DataFrame([["g", "g0"],
+ ... ["g", "g1"],
+ ... ["g", "g2"],
+ ... ["g", "g3"],
+ ... ["h", "h0"],
+ ... ["h", "h1"]], columns=["A", "B"])
+ >>> df.groupby("A").head(-1) # doctest: +SKIP
+ A B
+ 0 g g0
+ 1 g g1
+ 2 g g2
+ 4 h h0
"""
return self._limit(n, asc=True)
@@ -2240,6 +2293,21 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
6 5
9 8
Name: b, dtype: int64
+
+ Supports Groupby positional indexing Since pandas on Spark 3.4 (with
pandas 1.4+):
+
+ >>> df = ps.DataFrame([["g", "g0"],
+ ... ["g", "g1"],
+ ... ["g", "g2"],
+ ... ["g", "g3"],
+ ... ["h", "h0"],
+ ... ["h", "h1"]], columns=["A", "B"])
+ >>> df.groupby("A").tail(-1) # doctest: +SKIP
+ A B
+ 3 g g3
+ 2 g g2
+ 1 g g1
+ 5 h h1
"""
return self._limit(n, asc=False)
diff --git a/python/pyspark/pandas/tests/test_groupby.py
b/python/pyspark/pandas/tests/test_groupby.py
index e5451c329ec..1375d7a9bc0 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -2579,40 +2579,19 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
)
psdf = ps.from_pandas(pdf)
- self.assert_eq(
- pdf.groupby("a").head(2).sort_index(),
psdf.groupby("a").head(2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a").head(-2).sort_index(),
psdf.groupby("a").head(-2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a").head(100000).sort_index(),
psdf.groupby("a").head(100000).sort_index()
- )
-
- self.assert_eq(
- pdf.groupby("a")["b"].head(2).sort_index(),
psdf.groupby("a")["b"].head(2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a")["b"].head(-2).sort_index(),
- psdf.groupby("a")["b"].head(-2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby("a")["b"].head(100000).sort_index(),
- psdf.groupby("a")["b"].head(100000).sort_index(),
- )
-
- self.assert_eq(
- pdf.groupby("a")[["b"]].head(2).sort_index(),
- psdf.groupby("a")[["b"]].head(2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby("a")[["b"]].head(-2).sort_index(),
- psdf.groupby("a")[["b"]].head(-2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby("a")[["b"]].head(100000).sort_index(),
- psdf.groupby("a")[["b"]].head(100000).sort_index(),
- )
+ for limit in (2, 100000, -2, -100000, -1):
+ self.assert_eq(
+ pdf.groupby("a").head(limit).sort_index(),
+ psdf.groupby("a").head(limit).sort_index(),
+ )
+ self.assert_eq(
+ pdf.groupby("a")["b"].head(limit).sort_index(),
+ psdf.groupby("a")["b"].head(limit).sort_index(),
+ )
+ self.assert_eq(
+ pdf.groupby("a")[["b"]].head(limit).sort_index(),
+ psdf.groupby("a")[["b"]].head(limit).sort_index(),
+ )
self.assert_eq(
pdf.groupby(pdf.a // 2).head(2).sort_index(),
@@ -2656,45 +2635,26 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
)
psdf = ps.from_pandas(pdf)
- self.assert_eq(
- pdf.groupby("a").head(2).sort_index(),
psdf.groupby("a").head(2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a").head(-2).sort_index(),
psdf.groupby("a").head(-2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a").head(100000).sort_index(),
psdf.groupby("a").head(100000).sort_index()
- )
-
- self.assert_eq(
- pdf.groupby("a")["b"].head(2).sort_index(),
psdf.groupby("a")["b"].head(2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a")["b"].head(-2).sort_index(),
- psdf.groupby("a")["b"].head(-2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby("a")["b"].head(100000).sort_index(),
- psdf.groupby("a")["b"].head(100000).sort_index(),
- )
+ for limit in (2, 100000, -2, -100000, -1):
+ self.assert_eq(
+ pdf.groupby("a").head(limit).sort_index(),
+ psdf.groupby("a").head(limit).sort_index(),
+ )
+ self.assert_eq(
+ pdf.groupby("a")["b"].head(limit).sort_index(),
+ psdf.groupby("a")["b"].head(limit).sort_index(),
+ )
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y",
"c")])
pdf.columns = columns
psdf.columns = columns
- self.assert_eq(
- pdf.groupby(("x", "a")).head(2).sort_index(),
- psdf.groupby(("x", "a")).head(2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby(("x", "a")).head(-2).sort_index(),
- psdf.groupby(("x", "a")).head(-2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby(("x", "a")).head(100000).sort_index(),
- psdf.groupby(("x", "a")).head(100000).sort_index(),
- )
+ for limit in (2, 100000, -2, -100000, -1):
+ self.assert_eq(
+ pdf.groupby(("x", "a")).head(limit).sort_index(),
+ psdf.groupby(("x", "a")).head(limit).sort_index(),
+ )
def test_missing(self):
psdf = ps.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7, 8, 9]})
@@ -2953,40 +2913,19 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
)
psdf = ps.from_pandas(pdf)
- self.assert_eq(
- pdf.groupby("a").tail(2).sort_index(),
psdf.groupby("a").tail(2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a").tail(-2).sort_index(),
psdf.groupby("a").tail(-2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a").tail(100000).sort_index(),
psdf.groupby("a").tail(100000).sort_index()
- )
-
- self.assert_eq(
- pdf.groupby("a")["b"].tail(2).sort_index(),
psdf.groupby("a")["b"].tail(2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a")["b"].tail(-2).sort_index(),
- psdf.groupby("a")["b"].tail(-2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby("a")["b"].tail(100000).sort_index(),
- psdf.groupby("a")["b"].tail(100000).sort_index(),
- )
-
- self.assert_eq(
- pdf.groupby("a")[["b"]].tail(2).sort_index(),
- psdf.groupby("a")[["b"]].tail(2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby("a")[["b"]].tail(-2).sort_index(),
- psdf.groupby("a")[["b"]].tail(-2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby("a")[["b"]].tail(100000).sort_index(),
- psdf.groupby("a")[["b"]].tail(100000).sort_index(),
- )
+ for limit in (2, 100000, -2, -100000, -1):
+ self.assert_eq(
+ pdf.groupby("a").tail(limit).sort_index(),
+ psdf.groupby("a").tail(limit).sort_index(),
+ )
+ self.assert_eq(
+ pdf.groupby("a")["b"].tail(limit).sort_index(),
+ psdf.groupby("a")["b"].tail(limit).sort_index(),
+ )
+ self.assert_eq(
+ pdf.groupby("a")[["b"]].tail(limit).sort_index(),
+ psdf.groupby("a")[["b"]].tail(limit).sort_index(),
+ )
self.assert_eq(
pdf.groupby(pdf.a // 2).tail(2).sort_index(),
@@ -3030,45 +2969,26 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
)
psdf = ps.from_pandas(pdf)
- self.assert_eq(
- pdf.groupby("a").tail(2).sort_index(),
psdf.groupby("a").tail(2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a").tail(-2).sort_index(),
psdf.groupby("a").tail(-2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a").tail(100000).sort_index(),
psdf.groupby("a").tail(100000).sort_index()
- )
-
- self.assert_eq(
- pdf.groupby("a")["b"].tail(2).sort_index(),
psdf.groupby("a")["b"].tail(2).sort_index()
- )
- self.assert_eq(
- pdf.groupby("a")["b"].tail(-2).sort_index(),
- psdf.groupby("a")["b"].tail(-2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby("a")["b"].tail(100000).sort_index(),
- psdf.groupby("a")["b"].tail(100000).sort_index(),
- )
+ for limit in (2, 100000, -2, -100000, -1):
+ self.assert_eq(
+ pdf.groupby("a").tail(limit).sort_index(),
+ psdf.groupby("a").tail(limit).sort_index(),
+ )
+ self.assert_eq(
+ pdf.groupby("a")["b"].tail(limit).sort_index(),
+ psdf.groupby("a")["b"].tail(limit).sort_index(),
+ )
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y",
"c")])
pdf.columns = columns
psdf.columns = columns
- self.assert_eq(
- pdf.groupby(("x", "a")).tail(2).sort_index(),
- psdf.groupby(("x", "a")).tail(2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby(("x", "a")).tail(-2).sort_index(),
- psdf.groupby(("x", "a")).tail(-2).sort_index(),
- )
- self.assert_eq(
- pdf.groupby(("x", "a")).tail(100000).sort_index(),
- psdf.groupby(("x", "a")).tail(100000).sort_index(),
- )
+ for limit in (2, 100000, -2, -100000, -1):
+ self.assert_eq(
+ pdf.groupby(("x", "a")).tail(limit).sort_index(),
+ psdf.groupby(("x", "a")).tail(limit).sort_index(),
+ )
def test_ddof(self):
pdf = pd.DataFrame(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]