This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 37500780f8d [SPARK-41292][CONNECT] Support Window in 
pyspark.sql.window namespace
37500780f8d is described below

commit 37500780f8d3475aa6eec49036363bba852f1498
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Dec 30 10:23:23 2022 +0800

    [SPARK-41292][CONNECT] Support Window in pyspark.sql.window namespace
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to support Spark Connect's Window in `pyspark.sql.window` 
namespace.
    
    https://github.com/apache/spark/pull/39041 implemented the base, and 
https://github.com/apache/spark/pull/39149 implemented Spark Connect's Window.
    
    This PR connects them.
    
    ### Why are the changes needed?
    
    To provide the users the same usage, see also 
https://github.com/apache/spark/pull/39041.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see also https://github.com/apache/spark/pull/39041.
    Spark Connect can use Window functions via the same namespace 
`pyspark.sql.window`.
    
    ### How was this patch tested?
    
    Manually checked the related unittests.
    
    Closes #39290 from HyukjinKwon/SPARK-41292.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/connect/column.py |  2 --
 python/pyspark/sql/connect/window.py | 24 ++++++++----------------
 python/pyspark/sql/utils.py          | 25 ++++++++++++++++++++++---
 python/pyspark/sql/window.py         | 20 +++++++++++++-------
 4 files changed, 43 insertions(+), 28 deletions(-)

diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index 5025fc8e197..206d30b15d8 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -462,8 +462,6 @@ def _test() -> None:
         # TODO(SPARK-41771): __getitem__ does not work with Column.isin
         del pyspark.sql.connect.column.Column.getField.__doc__
         del pyspark.sql.connect.column.Column.getItem.__doc__
-        # TODO(SPARK-41758): Support Window functions
-        del pyspark.sql.connect.column.Column.over.__doc__
 
         (failure_count, test_count) = doctest.testmod(
             pyspark.sql.connect.column,
diff --git a/python/pyspark/sql/connect/window.py 
b/python/pyspark/sql/connect/window.py
index c54157d0dbb..24b057022bf 100644
--- a/python/pyspark/sql/connect/window.py
+++ b/python/pyspark/sql/connect/window.py
@@ -113,8 +113,6 @@ class WindowSpec:
             frame=self._frame,
         )
 
-    partitionBy.__doc__ = PySparkWindowSpec.partitionBy.__doc__
-
     def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> 
"WindowSpec":
         _cols: List[ColumnOrName] = []
         for col in cols:
@@ -149,8 +147,6 @@ class WindowSpec:
             frame=self._frame,
         )
 
-    orderBy.__doc__ = PySparkWindowSpec.orderBy.__doc__
-
     def rowsBetween(self, start: int, end: int) -> "WindowSpec":
         if not isinstance(start, int):
             raise TypeError(f"start must be a int, but got 
{type(start).__name__}")
@@ -168,8 +164,6 @@ class WindowSpec:
             frame=WindowFrame(isRowFrame=True, start=start, end=end),
         )
 
-    rowsBetween.__doc__ = PySparkWindowSpec.rowsBetween.__doc__
-
     def rangeBetween(self, start: int, end: int) -> "WindowSpec":
         if not isinstance(start, int):
             raise TypeError(f"start must be a int, but got 
{type(start).__name__}")
@@ -187,8 +181,6 @@ class WindowSpec:
             frame=WindowFrame(isRowFrame=False, start=start, end=end),
         )
 
-    rangeBetween.__doc__ = PySparkWindowSpec.rangeBetween.__doc__
-
     def __repr__(self) -> str:
         strs: List[str] = []
         if len(self._partitionSpec) > 0:
@@ -202,6 +194,10 @@ class WindowSpec:
         return "WindowSpec(" + ", ".join(strs) + ")"
 
 
+WindowSpec.rangeBetween.__doc__ = PySparkWindowSpec.rangeBetween.__doc__
+WindowSpec.rowsBetween.__doc__ = PySparkWindowSpec.rowsBetween.__doc__
+WindowSpec.orderBy.__doc__ = PySparkWindowSpec.orderBy.__doc__
+WindowSpec.partitionBy.__doc__ = PySparkWindowSpec.partitionBy.__doc__
 WindowSpec.__doc__ = PySparkWindowSpec.__doc__
 
 
@@ -221,27 +217,23 @@ class Window:
     def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) -> 
"WindowSpec":
         return Window._spec.partitionBy(*cols)
 
-    partitionBy.__doc__ = PySparkWindow.partitionBy.__doc__
-
     @staticmethod
     def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) -> 
"WindowSpec":
         return Window._spec.orderBy(*cols)
 
-    orderBy.__doc__ = PySparkWindow.orderBy.__doc__
-
     @staticmethod
     def rowsBetween(start: int, end: int) -> "WindowSpec":
         return Window._spec.rowsBetween(start, end)
 
-    rowsBetween.__doc__ = PySparkWindow.rowsBetween.__doc__
-
     @staticmethod
     def rangeBetween(start: int, end: int) -> "WindowSpec":
         return Window._spec.rangeBetween(start, end)
 
-    rangeBetween.__doc__ = PySparkWindow.rangeBetween.__doc__
-
 
+Window.orderBy.__doc__ = PySparkWindow.orderBy.__doc__
+Window.rowsBetween.__doc__ = PySparkWindow.rowsBetween.__doc__
+Window.rangeBetween.__doc__ = PySparkWindow.rangeBetween.__doc__
+Window.partitionBy.__doc__ = PySparkWindow.partitionBy.__doc__
 Window.__doc__ = PySparkWindow.__doc__
 
 
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 05d6f7ebea9..c171389cab3 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -341,10 +341,29 @@ def try_remote_window(f: FuncT) -> FuncT:
 
     @functools.wraps(f)
     def wrapped(*args: Any, **kwargs: Any) -> Any:
-        # TODO(SPARK-41292): Support Window functions
+
         if is_remote():
-            raise NotImplementedError()
-        return f(*args, **kwargs)
+            from pyspark.sql.connect.window import Window
+
+            return getattr(Window, f.__name__)(*args, **kwargs)
+        else:
+            return f(*args, **kwargs)
+
+    return cast(FuncT, wrapped)
+
+
+def try_remote_windowspec(f: FuncT) -> FuncT:
+    """Mark API supported from Spark Connect."""
+
+    @functools.wraps(f)
+    def wrapped(*args: Any, **kwargs: Any) -> Any:
+
+        if is_remote():
+            from pyspark.sql.connect.window import WindowSpec
+
+            return getattr(WindowSpec, f.__name__)(*args, **kwargs)
+        else:
+            return f(*args, **kwargs)
 
     return cast(FuncT, wrapped)
 
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
index 6a06345d61b..f35a1dacaf4 100644
--- a/python/pyspark/sql/window.py
+++ b/python/pyspark/sql/window.py
@@ -21,7 +21,7 @@ from py4j.java_gateway import JavaObject
 
 from pyspark import SparkContext
 from pyspark.sql.column import _to_seq, _to_java_column
-from pyspark.sql.utils import try_remote_window
+from pyspark.sql.utils import try_remote_window, try_remote_windowspec
 
 if TYPE_CHECKING:
     from pyspark.sql._typing import ColumnOrName, ColumnOrName_
@@ -72,6 +72,7 @@ class Window:
 
     currentRow: int = 0
 
+    # TODO(SPARK-41773): Window.partitionBy is not respected with row_number 
in Spark Connect
     @staticmethod
     @try_remote_window
     def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> 
"WindowSpec":
@@ -111,7 +112,7 @@ class Window:
         Show row number order by ``id`` in partition ``category``.
 
         >>> window = Window.partitionBy("category").orderBy("id")
-        >>> df.withColumn("row_number", row_number().over(window)).show()
+        >>> df.withColumn("row_number", row_number().over(window)).show()  # 
doctest: +SKIP
         +---+--------+----------+
         | id|category|row_number|
         +---+--------+----------+
@@ -128,6 +129,7 @@ class Window:
         jspec = 
sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
         return WindowSpec(jspec)
 
+    # TODO(SPARK-41773): Window.partitionBy is not respected with row_number 
in Spark Connect
     @staticmethod
     @try_remote_window
     def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> 
"WindowSpec":
@@ -167,7 +169,7 @@ class Window:
         Show row number order by ``category`` in partition ``id``.
 
         >>> window = Window.partitionBy("id").orderBy("category")
-        >>> df.withColumn("row_number", row_number().over(window)).show()
+        >>> df.withColumn("row_number", row_number().over(window)).show()  # 
doctest: +SKIP
         +---+--------+----------+
         | id|category|row_number|
         +---+--------+----------+
@@ -184,6 +186,7 @@ class Window:
         jspec = 
sc._jvm.org.apache.spark.sql.expressions.Window.orderBy(_to_java_cols(cols))
         return WindowSpec(jspec)
 
+    # TODO(SPARK-41773): Window.partitionBy is not respected with row_number 
in Spark Connect
     @staticmethod
     @try_remote_window
     def rowsBetween(start: int, end: int) -> "WindowSpec":
@@ -247,6 +250,7 @@ class Window:
 
         >>> window = 
Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
         >>> df.withColumn("sum", func.sum("id").over(window)).sort("id", 
"category", "sum").show()
+        ... # doctest: +SKIP
         +---+--------+---+
         | id|category|sum|
         +---+--------+---+
@@ -268,6 +272,7 @@ class Window:
         jspec = 
sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
         return WindowSpec(jspec)
 
+    # TODO(SPARK-41773): Window.partitionBy is not respected with row_number 
in Spark Connect
     @staticmethod
     @try_remote_window
     def rangeBetween(start: int, end: int) -> "WindowSpec":
@@ -334,6 +339,7 @@ class Window:
 
         >>> window = 
Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
         >>> df.withColumn("sum", func.sum("id").over(window)).sort("id", 
"category").show()
+        ... # doctest: +SKIP
         +---+--------+---+
         | id|category|sum|
         +---+--------+---+
@@ -372,7 +378,7 @@ class WindowSpec:
     def __init__(self, jspec: JavaObject) -> None:
         self._jspec = jspec
 
-    @try_remote_window
+    @try_remote_windowspec
     def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) 
-> "WindowSpec":
         """
         Defines the partitioning columns in a :class:`WindowSpec`.
@@ -386,7 +392,7 @@ class WindowSpec:
         """
         return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols)))
 
-    @try_remote_window
+    @try_remote_windowspec
     def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> 
"WindowSpec":
         """
         Defines the ordering columns in a :class:`WindowSpec`.
@@ -400,7 +406,7 @@ class WindowSpec:
         """
         return WindowSpec(self._jspec.orderBy(_to_java_cols(cols)))
 
-    @try_remote_window
+    @try_remote_windowspec
     def rowsBetween(self, start: int, end: int) -> "WindowSpec":
         """
         Defines the frame boundaries, from `start` (inclusive) to `end` 
(inclusive).
@@ -432,7 +438,7 @@ class WindowSpec:
             end = Window.unboundedFollowing
         return WindowSpec(self._jspec.rowsBetween(start, end))
 
-    @try_remote_window
+    @try_remote_windowspec
     def rangeBetween(self, start: int, end: int) -> "WindowSpec":
         """
         Defines the frame boundaries, from `start` (inclusive) to `end` 
(inclusive).


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to