This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 37500780f8d [SPARK-41292][CONNECT] Support Window in
pyspark.sql.window namespace
37500780f8d is described below
commit 37500780f8d3475aa6eec49036363bba852f1498
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Dec 30 10:23:23 2022 +0800
[SPARK-41292][CONNECT] Support Window in pyspark.sql.window namespace
### What changes were proposed in this pull request?
This PR proposes to support Spark Connect's Window in `pyspark.sql.window`
namespace.
https://github.com/apache/spark/pull/39041 implemented the base, and
https://github.com/apache/spark/pull/39149 implemented Spark Connect's Window.
This PR connects them.
### Why are the changes needed?
To provide the users the same usage, see also
https://github.com/apache/spark/pull/39041.
### Does this PR introduce _any_ user-facing change?
Yes, see also https://github.com/apache/spark/pull/39041.
Spark Connect can use Window functions via the same namespace
`pyspark.sql.window`.
### How was this patch tested?
Manually checked the related unittests.
Closes #39290 from HyukjinKwon/SPARK-41292.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/column.py | 2 --
python/pyspark/sql/connect/window.py | 24 ++++++++----------------
python/pyspark/sql/utils.py | 25 ++++++++++++++++++++++---
python/pyspark/sql/window.py | 20 +++++++++++++-------
4 files changed, 43 insertions(+), 28 deletions(-)
diff --git a/python/pyspark/sql/connect/column.py
b/python/pyspark/sql/connect/column.py
index 5025fc8e197..206d30b15d8 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -462,8 +462,6 @@ def _test() -> None:
# TODO(SPARK-41771): __getitem__ does not work with Column.isin
del pyspark.sql.connect.column.Column.getField.__doc__
del pyspark.sql.connect.column.Column.getItem.__doc__
- # TODO(SPARK-41758): Support Window functions
- del pyspark.sql.connect.column.Column.over.__doc__
(failure_count, test_count) = doctest.testmod(
pyspark.sql.connect.column,
diff --git a/python/pyspark/sql/connect/window.py
b/python/pyspark/sql/connect/window.py
index c54157d0dbb..24b057022bf 100644
--- a/python/pyspark/sql/connect/window.py
+++ b/python/pyspark/sql/connect/window.py
@@ -113,8 +113,6 @@ class WindowSpec:
frame=self._frame,
)
- partitionBy.__doc__ = PySparkWindowSpec.partitionBy.__doc__
-
def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) ->
"WindowSpec":
_cols: List[ColumnOrName] = []
for col in cols:
@@ -149,8 +147,6 @@ class WindowSpec:
frame=self._frame,
)
- orderBy.__doc__ = PySparkWindowSpec.orderBy.__doc__
-
def rowsBetween(self, start: int, end: int) -> "WindowSpec":
if not isinstance(start, int):
raise TypeError(f"start must be a int, but got
{type(start).__name__}")
@@ -168,8 +164,6 @@ class WindowSpec:
frame=WindowFrame(isRowFrame=True, start=start, end=end),
)
- rowsBetween.__doc__ = PySparkWindowSpec.rowsBetween.__doc__
-
def rangeBetween(self, start: int, end: int) -> "WindowSpec":
if not isinstance(start, int):
raise TypeError(f"start must be a int, but got
{type(start).__name__}")
@@ -187,8 +181,6 @@ class WindowSpec:
frame=WindowFrame(isRowFrame=False, start=start, end=end),
)
- rangeBetween.__doc__ = PySparkWindowSpec.rangeBetween.__doc__
-
def __repr__(self) -> str:
strs: List[str] = []
if len(self._partitionSpec) > 0:
@@ -202,6 +194,10 @@ class WindowSpec:
return "WindowSpec(" + ", ".join(strs) + ")"
+WindowSpec.rangeBetween.__doc__ = PySparkWindowSpec.rangeBetween.__doc__
+WindowSpec.rowsBetween.__doc__ = PySparkWindowSpec.rowsBetween.__doc__
+WindowSpec.orderBy.__doc__ = PySparkWindowSpec.orderBy.__doc__
+WindowSpec.partitionBy.__doc__ = PySparkWindowSpec.partitionBy.__doc__
WindowSpec.__doc__ = PySparkWindowSpec.__doc__
@@ -221,27 +217,23 @@ class Window:
def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) ->
"WindowSpec":
return Window._spec.partitionBy(*cols)
- partitionBy.__doc__ = PySparkWindow.partitionBy.__doc__
-
@staticmethod
def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) ->
"WindowSpec":
return Window._spec.orderBy(*cols)
- orderBy.__doc__ = PySparkWindow.orderBy.__doc__
-
@staticmethod
def rowsBetween(start: int, end: int) -> "WindowSpec":
return Window._spec.rowsBetween(start, end)
- rowsBetween.__doc__ = PySparkWindow.rowsBetween.__doc__
-
@staticmethod
def rangeBetween(start: int, end: int) -> "WindowSpec":
return Window._spec.rangeBetween(start, end)
- rangeBetween.__doc__ = PySparkWindow.rangeBetween.__doc__
-
+Window.orderBy.__doc__ = PySparkWindow.orderBy.__doc__
+Window.rowsBetween.__doc__ = PySparkWindow.rowsBetween.__doc__
+Window.rangeBetween.__doc__ = PySparkWindow.rangeBetween.__doc__
+Window.partitionBy.__doc__ = PySparkWindow.partitionBy.__doc__
Window.__doc__ = PySparkWindow.__doc__
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 05d6f7ebea9..c171389cab3 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -341,10 +341,29 @@ def try_remote_window(f: FuncT) -> FuncT:
@functools.wraps(f)
def wrapped(*args: Any, **kwargs: Any) -> Any:
- # TODO(SPARK-41292): Support Window functions
+
if is_remote():
- raise NotImplementedError()
- return f(*args, **kwargs)
+ from pyspark.sql.connect.window import Window
+
+ return getattr(Window, f.__name__)(*args, **kwargs)
+ else:
+ return f(*args, **kwargs)
+
+ return cast(FuncT, wrapped)
+
+
+def try_remote_windowspec(f: FuncT) -> FuncT:
+ """Mark API supported from Spark Connect."""
+
+ @functools.wraps(f)
+ def wrapped(*args: Any, **kwargs: Any) -> Any:
+
+ if is_remote():
+ from pyspark.sql.connect.window import WindowSpec
+
+ return getattr(WindowSpec, f.__name__)(*args, **kwargs)
+ else:
+ return f(*args, **kwargs)
return cast(FuncT, wrapped)
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
index 6a06345d61b..f35a1dacaf4 100644
--- a/python/pyspark/sql/window.py
+++ b/python/pyspark/sql/window.py
@@ -21,7 +21,7 @@ from py4j.java_gateway import JavaObject
from pyspark import SparkContext
from pyspark.sql.column import _to_seq, _to_java_column
-from pyspark.sql.utils import try_remote_window
+from pyspark.sql.utils import try_remote_window, try_remote_windowspec
if TYPE_CHECKING:
from pyspark.sql._typing import ColumnOrName, ColumnOrName_
@@ -72,6 +72,7 @@ class Window:
currentRow: int = 0
+ # TODO(SPARK-41773): Window.partitionBy is not respected with row_number
in Spark Connect
@staticmethod
@try_remote_window
def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) ->
"WindowSpec":
@@ -111,7 +112,7 @@ class Window:
Show row number order by ``id`` in partition ``category``.
>>> window = Window.partitionBy("category").orderBy("id")
- >>> df.withColumn("row_number", row_number().over(window)).show()
+ >>> df.withColumn("row_number", row_number().over(window)).show() #
doctest: +SKIP
+---+--------+----------+
| id|category|row_number|
+---+--------+----------+
@@ -128,6 +129,7 @@ class Window:
jspec =
sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
return WindowSpec(jspec)
+ # TODO(SPARK-41773): Window.partitionBy is not respected with row_number
in Spark Connect
@staticmethod
@try_remote_window
def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) ->
"WindowSpec":
@@ -167,7 +169,7 @@ class Window:
Show row number order by ``category`` in partition ``id``.
>>> window = Window.partitionBy("id").orderBy("category")
- >>> df.withColumn("row_number", row_number().over(window)).show()
+ >>> df.withColumn("row_number", row_number().over(window)).show() #
doctest: +SKIP
+---+--------+----------+
| id|category|row_number|
+---+--------+----------+
@@ -184,6 +186,7 @@ class Window:
jspec =
sc._jvm.org.apache.spark.sql.expressions.Window.orderBy(_to_java_cols(cols))
return WindowSpec(jspec)
+ # TODO(SPARK-41773): Window.partitionBy is not respected with row_number
in Spark Connect
@staticmethod
@try_remote_window
def rowsBetween(start: int, end: int) -> "WindowSpec":
@@ -247,6 +250,7 @@ class Window:
>>> window =
Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
>>> df.withColumn("sum", func.sum("id").over(window)).sort("id",
"category", "sum").show()
+ ... # doctest: +SKIP
+---+--------+---+
| id|category|sum|
+---+--------+---+
@@ -268,6 +272,7 @@ class Window:
jspec =
sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
return WindowSpec(jspec)
+ # TODO(SPARK-41773): Window.partitionBy is not respected with row_number
in Spark Connect
@staticmethod
@try_remote_window
def rangeBetween(start: int, end: int) -> "WindowSpec":
@@ -334,6 +339,7 @@ class Window:
>>> window =
Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
>>> df.withColumn("sum", func.sum("id").over(window)).sort("id",
"category").show()
+ ... # doctest: +SKIP
+---+--------+---+
| id|category|sum|
+---+--------+---+
@@ -372,7 +378,7 @@ class WindowSpec:
def __init__(self, jspec: JavaObject) -> None:
self._jspec = jspec
- @try_remote_window
+ @try_remote_windowspec
def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]])
-> "WindowSpec":
"""
Defines the partitioning columns in a :class:`WindowSpec`.
@@ -386,7 +392,7 @@ class WindowSpec:
"""
return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols)))
- @try_remote_window
+ @try_remote_windowspec
def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) ->
"WindowSpec":
"""
Defines the ordering columns in a :class:`WindowSpec`.
@@ -400,7 +406,7 @@ class WindowSpec:
"""
return WindowSpec(self._jspec.orderBy(_to_java_cols(cols)))
- @try_remote_window
+ @try_remote_windowspec
def rowsBetween(self, start: int, end: int) -> "WindowSpec":
"""
Defines the frame boundaries, from `start` (inclusive) to `end`
(inclusive).
@@ -432,7 +438,7 @@ class WindowSpec:
end = Window.unboundedFollowing
return WindowSpec(self._jspec.rowsBetween(start, end))
- @try_remote_window
+ @try_remote_windowspec
def rangeBetween(self, start: int, end: int) -> "WindowSpec":
"""
Defines the frame boundaries, from `start` (inclusive) to `end`
(inclusive).
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]