This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f27a6a8cf5 [SPARK-41722][CONNECT][PYTHON] Implement 3 missing time 
window functions
0f27a6a8cf5 is described below

commit 0f27a6a8cf52b6695bb755ff29824b1cd71c1ed7
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Dec 27 13:00:31 2022 +0800

    [SPARK-41722][CONNECT][PYTHON] Implement 3 missing time window functions
    
    ### What changes were proposed in this pull request?
     Implement 3 missing time window functions
    
    ### Why are the changes needed?
    For API coverage
    
    after this PR, following ones are missing:
    1, `call_udf`
    2, `pandas_udf`
    3, `sequence` - SPARK-41723
    4, `format_number` - SPARK-41473
    5, `unwrap_udt`
    6, `udf`
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ### How was this patch tested?
    added UT
    
    Closes #39227 from zhengruifeng/connect_time_3_func.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../sql/connect/planner/SparkConnectPlanner.scala  | 40 +++++++++
 python/pyspark/sql/connect/functions.py            | 67 +++++++++++++++
 .../sql/tests/connect/test_connect_function.py     | 96 ++++++++++++++++++++--
 3 files changed, 198 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 21ff96f158e..42cc96f35dc 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -691,6 +691,46 @@ class SparkConnectPlanner(session: SparkSession) {
         }
         Some(NthValue(children(0), children(1), ignoreNulls))
 
+      case "window" if 2 <= fun.getArgumentsCount && fun.getArgumentsCount <= 
4 =>
+        val children = 
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val timeCol = children.head
+        val args = children.tail.map {
+          case Literal(s, StringType) if s != null => s.toString
+          case other =>
+            throw InvalidPlanInput(
+              s"windowDuration,slideDuration,startTime should be literal 
strings, but got $other")
+        }
+        var windowDuration: String = null
+        var slideDuration: String = null
+        var startTime: String = null
+        if (args.length == 3) {
+          windowDuration = args(0)
+          slideDuration = args(1)
+          startTime = args(2)
+        } else if (args.length == 2) {
+          windowDuration = args(0)
+          slideDuration = args(1)
+          startTime = "0 second"
+        } else {
+          windowDuration = args(0)
+          slideDuration = args(0)
+          startTime = "0 second"
+        }
+        Some(
+          Alias(TimeWindow(timeCol, windowDuration, slideDuration, startTime), 
"window")(
+            nonInheritableMetadataKeys = Seq(Dataset.DATASET_ID_KEY, 
Dataset.COL_POS_KEY)))
+
+      case "session_window" if fun.getArgumentsCount == 2 =>
+        val children = 
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val timeCol = children.head
+        val sessionWindow = children.last match {
+          case Literal(s, StringType) if s != null => SessionWindow(timeCol, 
s.toString)
+          case other => SessionWindow(timeCol, other)
+        }
+        Some(
+          Alias(sessionWindow, "session_window")(nonInheritableMetadataKeys =
+            Seq(Dataset.DATASET_ID_KEY, Dataset.COL_POS_KEY)))
+
       case "bucket" if fun.getArgumentsCount == 2 =>
         val children = 
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
         (children.head, children.last) match {
diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index 407e7536f03..ad255266b21 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2070,6 +2070,73 @@ def timestamp_seconds(col: "ColumnOrName") -> Column:
 timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__
 
 
+def window(
+    timeColumn: "ColumnOrName",
+    windowDuration: str,
+    slideDuration: Optional[str] = None,
+    startTime: Optional[str] = None,
+) -> Column:
+    if windowDuration is None or not isinstance(windowDuration, str):
+        raise TypeError(
+            f"windowDuration should be as a string, "
+            f"but got {type(windowDuration).__name__} {windowDuration}"
+        )
+    if slideDuration is not None and not isinstance(slideDuration, str):
+        raise TypeError(
+            f"slideDuration should be as a string, "
+            f"but got {type(slideDuration).__name__} {slideDuration}"
+        )
+    if startTime is not None and not isinstance(startTime, str):
+        raise TypeError(
+            f"startTime should be as a string, " f"but got 
{type(startTime).__name__} {startTime}"
+        )
+
+    time_col = _to_col(timeColumn)
+
+    if slideDuration is not None and startTime is not None:
+        return _invoke_function(
+            "window", time_col, lit(windowDuration), lit(slideDuration), 
lit(startTime)
+        )
+    elif slideDuration is not None:
+        return _invoke_function("window", time_col, lit(windowDuration), 
lit(slideDuration))
+    elif startTime is not None:
+        return _invoke_function(
+            "window", time_col, lit(windowDuration), lit(windowDuration), 
lit(startTime)
+        )
+    else:
+        return _invoke_function("window", time_col, lit(windowDuration))
+
+
+window.__doc__ = pysparkfuncs.window.__doc__
+
+
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    return _invoke_function("window_time", _to_col(windowColumn))
+
+
+window_time.__doc__ = pysparkfuncs.window_time.__doc__
+
+
+def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, 
str]) -> Column:
+    if gapDuration is None or not isinstance(gapDuration, (Column, str)):
+        raise TypeError(
+            f"gapDuration should be as a string or Column, "
+            f"but got {type(gapDuration).__name__} {gapDuration}"
+        )
+
+    time_col = _to_col(timeColumn)
+
+    if isinstance(gapDuration, Column):
+        return _invoke_function("session_window", time_col, gapDuration)
+    else:
+        return _invoke_function("session_window", time_col, lit(gapDuration))
+
+
+session_window.__doc__ = pysparkfuncs.session_window.__doc__
+
+
 # Partition Transformation Functions
 
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py 
b/python/pyspark/sql/tests/connect/test_connect_function.py
index d00d712d7f8..37c29e2e66b 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -52,21 +52,21 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase):
     """These test cases exercise the interface to the proto plan
     generation but do not call Spark."""
 
-    def compare_by_show(self, df1, df2):
+    def compare_by_show(self, df1, df2, n: int = 20, truncate: int = 20):
         from pyspark.sql.dataframe import DataFrame as SDF
         from pyspark.sql.connect.dataframe import DataFrame as CDF
 
         assert isinstance(df1, (SDF, CDF))
         if isinstance(df1, SDF):
-            str1 = df1._jdf.showString(20, 20, False)
+            str1 = df1._jdf.showString(n, truncate, False)
         else:
-            str1 = df1._show_string(20, 20, False)
+            str1 = df1._show_string(n, truncate, False)
 
         assert isinstance(df2, (SDF, CDF))
         if isinstance(df2, SDF):
-            str2 = df2._jdf.showString(20, 20, False)
+            str2 = df2._jdf.showString(n, truncate, False)
         else:
-            str2 = df2._show_string(20, 20, False)
+            str2 = df2._show_string(n, truncate, False)
 
         self.assertEqual(str1, str2)
 
@@ -1838,6 +1838,92 @@ class 
SparkConnectFunctionTests(SparkConnectFuncTestCase):
             sdf.select(SF.next_day(sdf.ts1, "Mon")).toPandas(),
         )
 
+    def test_time_window_functions(self):
+        from pyspark.sql import functions as SF
+        from pyspark.sql.connect import functions as CF
+
+        query = """
+            SELECT * FROM VALUES
+            (TIMESTAMP('2022-12-25 10:30:00'), 1),
+            (TIMESTAMP('2022-12-25 10:31:00'), 2),
+            (TIMESTAMP('2022-12-25 10:32:00'), 1),
+            (TIMESTAMP('2022-12-25 10:33:00'), 2),
+            (TIMESTAMP('2022-12-26 09:30:00'), 1),
+            (TIMESTAMP('2022-12-26 09:35:00'), 3)
+            AS tab(date, val)
+            """
+
+        # +-------------------+---+
+        # |               date|val|
+        # +-------------------+---+
+        # |2022-12-25 10:30:00|  1|
+        # |2022-12-25 10:31:00|  2|
+        # |2022-12-25 10:32:00|  1|
+        # |2022-12-25 10:33:00|  2|
+        # |2022-12-26 09:30:00|  1|
+        # |2022-12-26 09:35:00|  3|
+        # +-------------------+---+
+
+        cdf = self.connect.sql(query)
+        sdf = self.spark.sql(query)
+
+        # test window
+        self.compare_by_show(
+            cdf.select(CF.window("date", "15 seconds")),
+            sdf.select(SF.window("date", "15 seconds")),
+            truncate=100,
+        )
+        self.compare_by_show(
+            cdf.select(CF.window(cdf.date, "1 minute")),
+            sdf.select(SF.window(sdf.date, "1 minute")),
+            truncate=100,
+        )
+
+        self.compare_by_show(
+            cdf.select(CF.window("date", "15 seconds", "5 seconds")),
+            sdf.select(SF.window("date", "15 seconds", "5 seconds")),
+            truncate=100,
+        )
+        self.compare_by_show(
+            cdf.select(CF.window(cdf.date, "1 minute", "10 seconds")),
+            sdf.select(SF.window(sdf.date, "1 minute", "10 seconds")),
+            truncate=100,
+        )
+
+        self.compare_by_show(
+            cdf.select(CF.window("date", "15 seconds", "10 seconds", "5 
seconds")),
+            sdf.select(SF.window("date", "15 seconds", "10 seconds", "5 
seconds")),
+            truncate=100,
+        )
+        self.compare_by_show(
+            cdf.select(CF.window(cdf.date, "1 minute", "10 seconds", "5 
seconds")),
+            sdf.select(SF.window(sdf.date, "1 minute", "10 seconds", "5 
seconds")),
+            truncate=100,
+        )
+
+        # test session_window
+        self.compare_by_show(
+            cdf.select(CF.session_window("date", "15 seconds")),
+            sdf.select(SF.session_window("date", "15 seconds")),
+            truncate=100,
+        )
+        self.compare_by_show(
+            cdf.select(CF.session_window(cdf.date, "1 minute")),
+            sdf.select(SF.session_window(sdf.date, "1 minute")),
+            truncate=100,
+        )
+
+        # test window_time
+        self.compare_by_show(
+            cdf.groupBy(CF.window("date", "5 seconds"))
+            .agg(CF.sum("val").alias("sum"))
+            .select(CF.window_time("window")),
+            sdf.groupBy(SF.window("date", "5 seconds"))
+            .agg(SF.sum("val").alias("sum"))
+            .select(SF.window_time("window")),
+            truncate=100,
+        )
+
     def test_misc_functions(self):
         from pyspark.sql import functions as SF
         from pyspark.sql.connect import functions as CF


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to