This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0f27a6a8cf5 [SPARK-41722][CONNECT][PYTHON] Implement 3 missing time
window functions
0f27a6a8cf5 is described below
commit 0f27a6a8cf52b6695bb755ff29824b1cd71c1ed7
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Dec 27 13:00:31 2022 +0800
[SPARK-41722][CONNECT][PYTHON] Implement 3 missing time window functions
### What changes were proposed in this pull request?
Implement 3 missing time window functions
### Why are the changes needed?
For API coverage
after this PR, following ones are missing:
1, `call_udf`
2, `pandas_udf`
3, `sequence` - SPARK-41723
4, `format_number` - SPARK-41473
5, `unwrap_udt`
6, `udf`
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
added UT
Closes #39227 from zhengruifeng/connect_time_3_func.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../sql/connect/planner/SparkConnectPlanner.scala | 40 +++++++++
python/pyspark/sql/connect/functions.py | 67 +++++++++++++++
.../sql/tests/connect/test_connect_function.py | 96 ++++++++++++++++++++--
3 files changed, 198 insertions(+), 5 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 21ff96f158e..42cc96f35dc 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -691,6 +691,46 @@ class SparkConnectPlanner(session: SparkSession) {
}
Some(NthValue(children(0), children(1), ignoreNulls))
+ case "window" if 2 <= fun.getArgumentsCount && fun.getArgumentsCount <=
4 =>
+ val children =
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+ val timeCol = children.head
+ val args = children.tail.map {
+ case Literal(s, StringType) if s != null => s.toString
+ case other =>
+ throw InvalidPlanInput(
+ s"windowDuration,slideDuration,startTime should be literal
strings, but got $other")
+ }
+ var windowDuration: String = null
+ var slideDuration: String = null
+ var startTime: String = null
+ if (args.length == 3) {
+ windowDuration = args(0)
+ slideDuration = args(1)
+ startTime = args(2)
+ } else if (args.length == 2) {
+ windowDuration = args(0)
+ slideDuration = args(1)
+ startTime = "0 second"
+ } else {
+ windowDuration = args(0)
+ slideDuration = args(0)
+ startTime = "0 second"
+ }
+ Some(
+ Alias(TimeWindow(timeCol, windowDuration, slideDuration, startTime),
"window")(
+ nonInheritableMetadataKeys = Seq(Dataset.DATASET_ID_KEY,
Dataset.COL_POS_KEY)))
+
+ case "session_window" if fun.getArgumentsCount == 2 =>
+ val children =
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+ val timeCol = children.head
+ val sessionWindow = children.last match {
+ case Literal(s, StringType) if s != null => SessionWindow(timeCol,
s.toString)
+ case other => SessionWindow(timeCol, other)
+ }
+ Some(
+ Alias(sessionWindow, "session_window")(nonInheritableMetadataKeys =
+ Seq(Dataset.DATASET_ID_KEY, Dataset.COL_POS_KEY)))
+
case "bucket" if fun.getArgumentsCount == 2 =>
val children =
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
(children.head, children.last) match {
diff --git a/python/pyspark/sql/connect/functions.py
b/python/pyspark/sql/connect/functions.py
index 407e7536f03..ad255266b21 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2070,6 +2070,73 @@ def timestamp_seconds(col: "ColumnOrName") -> Column:
timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__
+def window(
+ timeColumn: "ColumnOrName",
+ windowDuration: str,
+ slideDuration: Optional[str] = None,
+ startTime: Optional[str] = None,
+) -> Column:
+ if windowDuration is None or not isinstance(windowDuration, str):
+ raise TypeError(
+ f"windowDuration should be as a string, "
+ f"but got {type(windowDuration).__name__} {windowDuration}"
+ )
+ if slideDuration is not None and not isinstance(slideDuration, str):
+ raise TypeError(
+ f"slideDuration should be as a string, "
+ f"but got {type(slideDuration).__name__} {slideDuration}"
+ )
+ if startTime is not None and not isinstance(startTime, str):
+ raise TypeError(
+ f"startTime should be as a string, " f"but got
{type(startTime).__name__} {startTime}"
+ )
+
+ time_col = _to_col(timeColumn)
+
+ if slideDuration is not None and startTime is not None:
+ return _invoke_function(
+ "window", time_col, lit(windowDuration), lit(slideDuration),
lit(startTime)
+ )
+ elif slideDuration is not None:
+ return _invoke_function("window", time_col, lit(windowDuration),
lit(slideDuration))
+ elif startTime is not None:
+ return _invoke_function(
+ "window", time_col, lit(windowDuration), lit(windowDuration),
lit(startTime)
+ )
+ else:
+ return _invoke_function("window", time_col, lit(windowDuration))
+
+
+window.__doc__ = pysparkfuncs.window.__doc__
+
+
+def window_time(
+ windowColumn: "ColumnOrName",
+) -> Column:
+ return _invoke_function("window_time", _to_col(windowColumn))
+
+
+window_time.__doc__ = pysparkfuncs.window_time.__doc__
+
+
+def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column,
str]) -> Column:
+ if gapDuration is None or not isinstance(gapDuration, (Column, str)):
+ raise TypeError(
+ f"gapDuration should be as a string or Column, "
+ f"but got {type(gapDuration).__name__} {gapDuration}"
+ )
+
+ time_col = _to_col(timeColumn)
+
+ if isinstance(gapDuration, Column):
+ return _invoke_function("session_window", time_col, gapDuration)
+ else:
+ return _invoke_function("session_window", time_col, lit(gapDuration))
+
+
+session_window.__doc__ = pysparkfuncs.session_window.__doc__
+
+
# Partition Transformation Functions
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py
b/python/pyspark/sql/tests/connect/test_connect_function.py
index d00d712d7f8..37c29e2e66b 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -52,21 +52,21 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase):
"""These test cases exercise the interface to the proto plan
generation but do not call Spark."""
- def compare_by_show(self, df1, df2):
+ def compare_by_show(self, df1, df2, n: int = 20, truncate: int = 20):
from pyspark.sql.dataframe import DataFrame as SDF
from pyspark.sql.connect.dataframe import DataFrame as CDF
assert isinstance(df1, (SDF, CDF))
if isinstance(df1, SDF):
- str1 = df1._jdf.showString(20, 20, False)
+ str1 = df1._jdf.showString(n, truncate, False)
else:
- str1 = df1._show_string(20, 20, False)
+ str1 = df1._show_string(n, truncate, False)
assert isinstance(df2, (SDF, CDF))
if isinstance(df2, SDF):
- str2 = df2._jdf.showString(20, 20, False)
+ str2 = df2._jdf.showString(n, truncate, False)
else:
- str2 = df2._show_string(20, 20, False)
+ str2 = df2._show_string(n, truncate, False)
self.assertEqual(str1, str2)
@@ -1838,6 +1838,92 @@ class
SparkConnectFunctionTests(SparkConnectFuncTestCase):
sdf.select(SF.next_day(sdf.ts1, "Mon")).toPandas(),
)
+ def test_time_window_functions(self):
+ from pyspark.sql import functions as SF
+ from pyspark.sql.connect import functions as CF
+
+ query = """
+ SELECT * FROM VALUES
+ (TIMESTAMP('2022-12-25 10:30:00'), 1),
+ (TIMESTAMP('2022-12-25 10:31:00'), 2),
+ (TIMESTAMP('2022-12-25 10:32:00'), 1),
+ (TIMESTAMP('2022-12-25 10:33:00'), 2),
+ (TIMESTAMP('2022-12-26 09:30:00'), 1),
+ (TIMESTAMP('2022-12-26 09:35:00'), 3)
+ AS tab(date, val)
+ """
+
+ # +-------------------+---+
+ # | date|val|
+ # +-------------------+---+
+ # |2022-12-25 10:30:00| 1|
+ # |2022-12-25 10:31:00| 2|
+ # |2022-12-25 10:32:00| 1|
+ # |2022-12-25 10:33:00| 2|
+ # |2022-12-26 09:30:00| 1|
+ # |2022-12-26 09:35:00| 3|
+ # +-------------------+---+
+
+ cdf = self.connect.sql(query)
+ sdf = self.spark.sql(query)
+
+ # test window
+ self.compare_by_show(
+ cdf.select(CF.window("date", "15 seconds")),
+ sdf.select(SF.window("date", "15 seconds")),
+ truncate=100,
+ )
+ self.compare_by_show(
+ cdf.select(CF.window(cdf.date, "1 minute")),
+ sdf.select(SF.window(sdf.date, "1 minute")),
+ truncate=100,
+ )
+
+ self.compare_by_show(
+ cdf.select(CF.window("date", "15 seconds", "5 seconds")),
+ sdf.select(SF.window("date", "15 seconds", "5 seconds")),
+ truncate=100,
+ )
+ self.compare_by_show(
+ cdf.select(CF.window(cdf.date, "1 minute", "10 seconds")),
+ sdf.select(SF.window(sdf.date, "1 minute", "10 seconds")),
+ truncate=100,
+ )
+
+ self.compare_by_show(
+ cdf.select(CF.window("date", "15 seconds", "10 seconds", "5
seconds")),
+ sdf.select(SF.window("date", "15 seconds", "10 seconds", "5
seconds")),
+ truncate=100,
+ )
+ self.compare_by_show(
+ cdf.select(CF.window(cdf.date, "1 minute", "10 seconds", "5
seconds")),
+ sdf.select(SF.window(sdf.date, "1 minute", "10 seconds", "5
seconds")),
+ truncate=100,
+ )
+
+ # test session_window
+ self.compare_by_show(
+ cdf.select(CF.session_window("date", "15 seconds")),
+ sdf.select(SF.session_window("date", "15 seconds")),
+ truncate=100,
+ )
+ self.compare_by_show(
+ cdf.select(CF.session_window(cdf.date, "1 minute")),
+ sdf.select(SF.session_window(sdf.date, "1 minute")),
+ truncate=100,
+ )
+
+ # test window_time
+ self.compare_by_show(
+ cdf.groupBy(CF.window("date", "5 seconds"))
+ .agg(CF.sum("val").alias("sum"))
+ .select(CF.window_time("window")),
+ sdf.groupBy(SF.window("date", "5 seconds"))
+ .agg(SF.sum("val").alias("sum"))
+ .select(SF.window_time("window")),
+ truncate=100,
+ )
+
def test_misc_functions(self):
from pyspark.sql import functions as SF
from pyspark.sql.connect import functions as CF
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]