This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bbad5c417a [SPARK-38947][PYTHON][PS] Supports groupby positional 
indexing
3bbad5c417a is described below

commit 3bbad5c417a8f819fa6e4838543ed8e63b4a6e75
Author: Yikun Jiang <[email protected]>
AuthorDate: Thu May 19 19:02:43 2022 +0900

    [SPARK-38947][PYTHON][PS] Supports groupby positional indexing
    
    ### What changes were proposed in this pull request?
    Add groupby positional indexing support for Pandas on Spark.
    
    ### Why are the changes needed?
    
    Pandas supports Groupby positional indexing since v1.4.0
    
https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing
    
    Before 1.4.0, `pdf.groupby("a").head(-2) == pdf.groupby("a").head(0)` 
return empty dataframe, after 1.4.0, it allows to specify positional ranges 
relative to the ends of each group.
    
    ```python
    df = pd.DataFrame([["g", "g0"], ["g", "g1"], ["g", "g2"], ["g", "g3"], 
["h", "h0"], ["h", "h1"]], columns=["A", "B"])
    df.groupby("A").head(-1)
    
       A   B
    0  g  g0
    1  g  g1
    2  g  g2
    4  h  h0
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, follow pandas 1.4+ behaviors, still keep the behavor when using pandas 
< 1.4
    
    ### How was this patch tested?
    - Added some more postional indexing test (-1, -100000).
    - `test_tail` and `test_head` pass with pandas 1.3.x and 1.4.x
    
    Closes #36464 from Yikun/SPARK-38947.
    
    Authored-by: Yikun Jiang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/groupby.py            |  90 +++++++++++--
 python/pyspark/pandas/tests/test_groupby.py | 188 ++++++++--------------------
 2 files changed, 133 insertions(+), 145 deletions(-)

diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 9a931f20fd4..2fc237031cd 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -2122,22 +2122,60 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
         groupkey_scols = [psdf._internal.spark_column_for(label) for label in 
groupkey_labels]
 
         sdf = psdf._internal.spark_frame
-        tmp_col = verify_temp_column_name(sdf, "__row_number__")
 
+        window = Window.partitionBy(*groupkey_scols)
         # This part is handled differently depending on whether it is a tail 
or a head.
-        window = (
-            
Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc())
+        ordered_window = (
+            window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc())
             if asc
-            else Window.partitionBy(*groupkey_scols).orderBy(
-                F.col(NATURAL_ORDER_COLUMN_NAME).desc()
-            )
+            else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc())
         )
 
-        sdf = (
-            sdf.withColumn(tmp_col, F.row_number().over(window))
-            .filter(F.col(tmp_col) <= n)
-            .drop(tmp_col)
-        )
+        if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"):
+            tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__")
+            sdf = (
+                sdf.withColumn(tmp_row_num_col, 
F.row_number().over(ordered_window))
+                .filter(F.col(tmp_row_num_col) <= n)
+                .drop(tmp_row_num_col)
+            )
+        else:
+            # Pandas supports Groupby positional indexing since v1.4.0
+            # 
https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing
+            #
+            # To support groupby positional indexing, we need add a 
`__tmp_lag__` column to help
+            # us filtering rows before the specified offset row.
+            #
+            # For example for the dataframe:
+            # >>> df = ps.DataFrame([["g", "g0"],
+            # ...                   ["g", "g1"],
+            # ...                   ["g", "g2"],
+            # ...                   ["g", "g3"],
+            # ...                   ["h", "h0"],
+            # ...                   ["h", "h1"]], columns=["A", "B"])
+            # >>> df.groupby("A").head(-1)
+            #
+            # Below is a result to show the `__tmp_lag__` column for above df, 
the limit n is
+            # `-1`, the `__tmp_lag__` will be set to `0` in rows[:-1], and 
left will be set to
+            # `null`:
+            #
+            # >>> sdf.withColumn(tmp_lag_col, F.lag(F.lit(0), 
-1).over(ordered_window))
+            # 
+-----------------+--------------+---+---+-----------------+-----------+
+            # |__index_level_0__|__groupkey_0__|  A|  
B|__natural_order__|__tmp_lag__|
+            # 
+-----------------+--------------+---+---+-----------------+-----------+
+            # |                0|             g|  g| g0|                0|     
     0|
+            # |                1|             g|  g| g1|       8589934592|     
     0|
+            # |                2|             g|  g| g2|      17179869184|     
     0|
+            # |                3|             g|  g| g3|      25769803776|     
  null|
+            # |                4|             h|  h| h0|      34359738368|     
     0|
+            # |                5|             h|  h| h1|      42949672960|     
  null|
+            # 
+-----------------+--------------+---+---+-----------------+-----------+
+            #
+            tmp_lag_col = verify_temp_column_name(sdf, "__tmp_lag__")
+            sdf = (
+                sdf.withColumn(tmp_lag_col, F.lag(F.lit(0), 
n).over(ordered_window))
+                .where(~F.isnull(F.col(tmp_lag_col)))
+                .drop(tmp_lag_col)
+            )
 
         internal = psdf._internal.with_new_sdf(sdf)
         return 
self._cleanup_and_return(DataFrame(internal).drop(groupkey_labels, axis=1))
@@ -2187,6 +2225,21 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
         7      2
         10    10
         Name: b, dtype: int64
+
+        Supports Groupby positional indexing Since pandas on Spark 3.4 (with 
pandas 1.4+):
+
+        >>> df = ps.DataFrame([["g", "g0"],
+        ...                   ["g", "g1"],
+        ...                   ["g", "g2"],
+        ...                   ["g", "g3"],
+        ...                   ["h", "h0"],
+        ...                   ["h", "h1"]], columns=["A", "B"])
+        >>> df.groupby("A").head(-1) # doctest: +SKIP
+           A   B
+        0  g  g0
+        1  g  g1
+        2  g  g2
+        4  h  h0
         """
         return self._limit(n, asc=True)
 
@@ -2240,6 +2293,21 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
         6    5
         9    8
         Name: b, dtype: int64
+
+        Supports Groupby positional indexing Since pandas on Spark 3.4 (with 
pandas 1.4+):
+
+        >>> df = ps.DataFrame([["g", "g0"],
+        ...                   ["g", "g1"],
+        ...                   ["g", "g2"],
+        ...                   ["g", "g3"],
+        ...                   ["h", "h0"],
+        ...                   ["h", "h1"]], columns=["A", "B"])
+        >>> df.groupby("A").tail(-1) # doctest: +SKIP
+           A   B
+        3  g  g3
+        2  g  g2
+        1  g  g1
+        5  h  h1
         """
         return self._limit(n, asc=False)
 
diff --git a/python/pyspark/pandas/tests/test_groupby.py 
b/python/pyspark/pandas/tests/test_groupby.py
index e5451c329ec..1375d7a9bc0 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -2579,40 +2579,19 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
         )
         psdf = ps.from_pandas(pdf)
 
-        self.assert_eq(
-            pdf.groupby("a").head(2).sort_index(), 
psdf.groupby("a").head(2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a").head(-2).sort_index(), 
psdf.groupby("a").head(-2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a").head(100000).sort_index(), 
psdf.groupby("a").head(100000).sort_index()
-        )
-
-        self.assert_eq(
-            pdf.groupby("a")["b"].head(2).sort_index(), 
psdf.groupby("a")["b"].head(2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a")["b"].head(-2).sort_index(),
-            psdf.groupby("a")["b"].head(-2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby("a")["b"].head(100000).sort_index(),
-            psdf.groupby("a")["b"].head(100000).sort_index(),
-        )
-
-        self.assert_eq(
-            pdf.groupby("a")[["b"]].head(2).sort_index(),
-            psdf.groupby("a")[["b"]].head(2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby("a")[["b"]].head(-2).sort_index(),
-            psdf.groupby("a")[["b"]].head(-2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby("a")[["b"]].head(100000).sort_index(),
-            psdf.groupby("a")[["b"]].head(100000).sort_index(),
-        )
+        for limit in (2, 100000, -2, -100000, -1):
+            self.assert_eq(
+                pdf.groupby("a").head(limit).sort_index(),
+                psdf.groupby("a").head(limit).sort_index(),
+            )
+            self.assert_eq(
+                pdf.groupby("a")["b"].head(limit).sort_index(),
+                psdf.groupby("a")["b"].head(limit).sort_index(),
+            )
+            self.assert_eq(
+                pdf.groupby("a")[["b"]].head(limit).sort_index(),
+                psdf.groupby("a")[["b"]].head(limit).sort_index(),
+            )
 
         self.assert_eq(
             pdf.groupby(pdf.a // 2).head(2).sort_index(),
@@ -2656,45 +2635,26 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
         )
         psdf = ps.from_pandas(pdf)
 
-        self.assert_eq(
-            pdf.groupby("a").head(2).sort_index(), 
psdf.groupby("a").head(2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a").head(-2).sort_index(), 
psdf.groupby("a").head(-2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a").head(100000).sort_index(), 
psdf.groupby("a").head(100000).sort_index()
-        )
-
-        self.assert_eq(
-            pdf.groupby("a")["b"].head(2).sort_index(), 
psdf.groupby("a")["b"].head(2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a")["b"].head(-2).sort_index(),
-            psdf.groupby("a")["b"].head(-2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby("a")["b"].head(100000).sort_index(),
-            psdf.groupby("a")["b"].head(100000).sort_index(),
-        )
+        for limit in (2, 100000, -2, -100000, -1):
+            self.assert_eq(
+                pdf.groupby("a").head(limit).sort_index(),
+                psdf.groupby("a").head(limit).sort_index(),
+            )
+            self.assert_eq(
+                pdf.groupby("a")["b"].head(limit).sort_index(),
+                psdf.groupby("a")["b"].head(limit).sort_index(),
+            )
 
         # multi-index columns
         columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", 
"c")])
         pdf.columns = columns
         psdf.columns = columns
 
-        self.assert_eq(
-            pdf.groupby(("x", "a")).head(2).sort_index(),
-            psdf.groupby(("x", "a")).head(2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby(("x", "a")).head(-2).sort_index(),
-            psdf.groupby(("x", "a")).head(-2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby(("x", "a")).head(100000).sort_index(),
-            psdf.groupby(("x", "a")).head(100000).sort_index(),
-        )
+        for limit in (2, 100000, -2, -100000, -1):
+            self.assert_eq(
+                pdf.groupby(("x", "a")).head(limit).sort_index(),
+                psdf.groupby(("x", "a")).head(limit).sort_index(),
+            )
 
     def test_missing(self):
         psdf = ps.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7, 8, 9]})
@@ -2953,40 +2913,19 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
         )
         psdf = ps.from_pandas(pdf)
 
-        self.assert_eq(
-            pdf.groupby("a").tail(2).sort_index(), 
psdf.groupby("a").tail(2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a").tail(-2).sort_index(), 
psdf.groupby("a").tail(-2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a").tail(100000).sort_index(), 
psdf.groupby("a").tail(100000).sort_index()
-        )
-
-        self.assert_eq(
-            pdf.groupby("a")["b"].tail(2).sort_index(), 
psdf.groupby("a")["b"].tail(2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a")["b"].tail(-2).sort_index(),
-            psdf.groupby("a")["b"].tail(-2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby("a")["b"].tail(100000).sort_index(),
-            psdf.groupby("a")["b"].tail(100000).sort_index(),
-        )
-
-        self.assert_eq(
-            pdf.groupby("a")[["b"]].tail(2).sort_index(),
-            psdf.groupby("a")[["b"]].tail(2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby("a")[["b"]].tail(-2).sort_index(),
-            psdf.groupby("a")[["b"]].tail(-2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby("a")[["b"]].tail(100000).sort_index(),
-            psdf.groupby("a")[["b"]].tail(100000).sort_index(),
-        )
+        for limit in (2, 100000, -2, -100000, -1):
+            self.assert_eq(
+                pdf.groupby("a").tail(limit).sort_index(),
+                psdf.groupby("a").tail(limit).sort_index(),
+            )
+            self.assert_eq(
+                pdf.groupby("a")["b"].tail(limit).sort_index(),
+                psdf.groupby("a")["b"].tail(limit).sort_index(),
+            )
+            self.assert_eq(
+                pdf.groupby("a")[["b"]].tail(limit).sort_index(),
+                psdf.groupby("a")[["b"]].tail(limit).sort_index(),
+            )
 
         self.assert_eq(
             pdf.groupby(pdf.a // 2).tail(2).sort_index(),
@@ -3030,45 +2969,26 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
         )
         psdf = ps.from_pandas(pdf)
 
-        self.assert_eq(
-            pdf.groupby("a").tail(2).sort_index(), 
psdf.groupby("a").tail(2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a").tail(-2).sort_index(), 
psdf.groupby("a").tail(-2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a").tail(100000).sort_index(), 
psdf.groupby("a").tail(100000).sort_index()
-        )
-
-        self.assert_eq(
-            pdf.groupby("a")["b"].tail(2).sort_index(), 
psdf.groupby("a")["b"].tail(2).sort_index()
-        )
-        self.assert_eq(
-            pdf.groupby("a")["b"].tail(-2).sort_index(),
-            psdf.groupby("a")["b"].tail(-2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby("a")["b"].tail(100000).sort_index(),
-            psdf.groupby("a")["b"].tail(100000).sort_index(),
-        )
+        for limit in (2, 100000, -2, -100000, -1):
+            self.assert_eq(
+                pdf.groupby("a").tail(limit).sort_index(),
+                psdf.groupby("a").tail(limit).sort_index(),
+            )
+            self.assert_eq(
+                pdf.groupby("a")["b"].tail(limit).sort_index(),
+                psdf.groupby("a")["b"].tail(limit).sort_index(),
+            )
 
         # multi-index columns
         columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", 
"c")])
         pdf.columns = columns
         psdf.columns = columns
 
-        self.assert_eq(
-            pdf.groupby(("x", "a")).tail(2).sort_index(),
-            psdf.groupby(("x", "a")).tail(2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby(("x", "a")).tail(-2).sort_index(),
-            psdf.groupby(("x", "a")).tail(-2).sort_index(),
-        )
-        self.assert_eq(
-            pdf.groupby(("x", "a")).tail(100000).sort_index(),
-            psdf.groupby(("x", "a")).tail(100000).sort_index(),
-        )
+        for limit in (2, 100000, -2, -100000, -1):
+            self.assert_eq(
+                pdf.groupby(("x", "a")).tail(limit).sort_index(),
+                psdf.groupby(("x", "a")).tail(limit).sort_index(),
+            )
 
     def test_ddof(self):
         pdf = pd.DataFrame(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to