This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 96b5d50f3ef [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time 
function to extract event time from the window column
96b5d50f3ef is described below

commit 96b5d50f3efb97c734f8c370e263a82d34f78d1b
Author: Alex Balikov <[email protected]>
AuthorDate: Mon Oct 24 08:12:42 2022 +0900

    [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to 
extract event time from the window column
    
    ### What changes were proposed in this pull request?
    
    This PR introduces a window_time function to extract streaming event time 
from a window column produced by the window aggregating operators. This is one 
step in sequence of fixes required to add support for multiple stateful 
operators in Spark Structured Streaming as described in 
https://issues.apache.org/jira/browse/SPARK-40821
    
    ### Why are the changes needed?
    
    The window_time function is a convenience function to compute correct event 
time for a window aggregate records. Such records produced by window 
aggregating operators have no explicit event time but rather a window column of 
type StructType { start: TimestampType, end: TimestampType } where start is 
inclusive and end is exclusive. The correct event time for such record is 
window.end - 1. The event time is necessary when chaining other stateful 
operators after the window aggregating op [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes: The PR introduces a new window_time SQL function for both Scala and 
Python APIs.
    
    ### How was this patch tested?
    
    Added new unit tests.
    
    Closes #38288 from alex-balikov/SPARK-40821-time-window.
    
    Authored-by: Alex Balikov <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../source/reference/pyspark.sql/functions.rst     |   1 +
 python/pyspark/sql/functions.py                    |  46 +++
 python/pyspark/sql/tests/test_functions.py         |  16 +
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 238 +-------------
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../sql/catalyst/analysis/ResolveTimeWindows.scala | 346 +++++++++++++++++++++
 .../sql/catalyst/expressions/TimeWindow.scala      |   2 +
 .../sql/catalyst/expressions/WindowTime.scala      |  62 ++++
 .../scala/org/apache/spark/sql/functions.scala     |  17 +
 .../sql-functions/sql-expression-schema.md         |   1 +
 .../spark/sql/DataFrameTimeWindowingSuite.scala    |  62 ++++
 11 files changed, 555 insertions(+), 237 deletions(-)

diff --git a/python/docs/source/reference/pyspark.sql/functions.rst 
b/python/docs/source/reference/pyspark.sql/functions.rst
index 5a64845598e..37ddbaf1673 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -142,6 +142,7 @@ Datetime Functions
     window
     session_window
     timestamp_seconds
+    window_time
 
 
 Collection Functions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index f01379afd6e..ad1bc488e87 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -4884,6 +4884,52 @@ def window(
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values 
are produced
+    by window aggregating operators and are of type `STRUCT<start: TIMESTAMP, 
end: TIMESTAMP>`
+    where start is inclusive and end is exclusive. The event time of records 
produced by window
+    aggregating operators can be computed as ``window_time(window)`` and are
+    ``window.end - lit(1).alias("microsecond")`` (as microsecond is the 
minimal supported event
+    time precision). The window column must be one produced by a window 
aggregating operator.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+
+    Examples
+    --------
+    >>> import datetime
+    >>> df = spark.createDataFrame(
+    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
+    ... ).toDF("date", "val")
+
+    Group the data into 5 second time windows and aggregate as sum.
+
+    >>> w = df.groupBy(window("date", "5 
seconds")).agg(sum("val").alias("sum"))
+
+    Extract the window event time using the window_time function.
+
+    >>> w.select(
+    ...     w.window.end.cast("string").alias("end"),
+    ...     window_time(w.window).cast("string").alias("window_time"),
+    ...     "sum"
+    ... ).collect()
+    [Row(end='2016-03-11 09:00:10', window_time='2016-03-11 09:00:09.999999', 
sum=1)]
+    """
+    window_col = _to_java_column(windowColumn)
+    return _invoke_function("window_time", window_col)
+
+
 def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, 
str]) -> Column:
     """
     Generates session window given a timestamp specifying column.
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index 32cc77e1115..55ef012b6d0 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -894,6 +894,22 @@ class FunctionsTests(ReusedSQLTestCase):
         for r, ex in zip(rs, expected):
             self.assertEqual(tuple(r), ex[: len(r)])
 
+    def test_window_time(self):
+        df = self.spark.createDataFrame(
+            [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)], ["date", "val"]
+        )
+        from pyspark.sql import functions as F
+
+        w = df.groupBy(F.window("date", "5 
seconds")).agg(F.sum("val").alias("sum"))
+        r = w.select(
+            w.window.end.cast("string").alias("end"),
+            F.window_time(w.window).cast("string").alias("window_time"),
+            "sum",
+        ).collect()
+        self.assertEqual(
+            r[0], Row(end="2016-03-11 09:00:10", window_time="2016-03-11 
09:00:09.999999", sum=1)
+        )
+
     def test_collect_functions(self):
         df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
         from pyspark.sql import functions
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b185b38797b..fc12b6522b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -56,7 +56,6 @@ import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType.DAY
 import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
-import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.{Utils => CUtils}
 
@@ -313,6 +312,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       ResolveAggregateFunctions ::
       TimeWindowing ::
       SessionWindowing ::
+      ResolveWindowTime ::
       ResolveDefaultColumns(v1SessionCatalog) ::
       ResolveInlineTables ::
       ResolveLambdaVariables ::
@@ -3965,242 +3965,6 @@ object EliminateEventTimeWatermark extends 
Rule[LogicalPlan] {
   }
 }
 
-/**
- * Maps a time column to multiple time windows using the Expand operator. 
Since it's non-trivial to
- * figure out how many windows a time column can map to, we over-estimate the 
number of windows and
- * filter out the rows where the time column is not inside the time window.
- */
-object TimeWindowing extends Rule[LogicalPlan] {
-  import org.apache.spark.sql.catalyst.dsl.expressions._
-
-  private final val WINDOW_COL_NAME = "window"
-  private final val WINDOW_START = "start"
-  private final val WINDOW_END = "end"
-
-  /**
-   * Generates the logical plan for generating window ranges on a timestamp 
column. Without
-   * knowing what the timestamp value is, it's non-trivial to figure out 
deterministically how many
-   * window ranges a timestamp will map to given all possible combinations of 
a window duration,
-   * slide duration and start time (offset). Therefore, we express and 
over-estimate the number of
-   * windows there may be, and filter the valid windows. We use last Project 
operator to group
-   * the window columns into a struct so they can be accessed as 
`window.start` and `window.end`.
-   *
-   * The windows are calculated as below:
-   * maxNumOverlapping <- ceil(windowDuration / slideDuration)
-   * for (i <- 0 until maxNumOverlapping)
-   *   lastStart <- timestamp - (timestamp - startTime + slideDuration) % 
slideDuration
-   *   windowStart <- lastStart - i * slideDuration
-   *   windowEnd <- windowStart + windowDuration
-   *   return windowStart, windowEnd
-   *
-   * This behaves as follows for the given parameters for the time: 12:05. The 
valid windows are
-   * marked with a +, and invalid ones are marked with a x. The invalid ones 
are filtered using the
-   * Filter operator.
-   * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
-   *     11:55 - 12:07 +                      11:52 - 12:04 x
-   *     12:00 - 12:12 +                      11:57 - 12:09 +
-   *     12:05 - 12:17 +                      12:02 - 12:14 +
-   *
-   * @param plan The logical plan
-   * @return the logical plan that will generate the time windows using the 
Expand operator, with
-   *         the Filter operator for correctness and Project for usability.
-   */
-  def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
-    _.containsPattern(TIME_WINDOW), ruleId) {
-    case p: LogicalPlan if p.children.size == 1 =>
-      val child = p.children.head
-      val windowExpressions =
-        p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet
-
-      val numWindowExpr = p.expressions.flatMap(_.collect {
-        case s: SessionWindow => s
-        case t: TimeWindow => t
-      }).toSet.size
-
-      // Only support a single window expression for now
-      if (numWindowExpr == 1 && windowExpressions.nonEmpty &&
-          windowExpressions.head.timeColumn.resolved &&
-          windowExpressions.head.checkInputDataTypes().isSuccess) {
-
-        val window = windowExpressions.head
-
-        val metadata = window.timeColumn match {
-          case a: Attribute => a.metadata
-          case _ => Metadata.empty
-        }
-
-        def getWindow(i: Int, dataType: DataType): Expression = {
-          val timestamp = PreciseTimestampConversion(window.timeColumn, 
dataType, LongType)
-          val lastStart = timestamp - (timestamp - window.startTime
-            + window.slideDuration) % window.slideDuration
-          val windowStart = lastStart - i * window.slideDuration
-          val windowEnd = windowStart + window.windowDuration
-
-          // We make sure value fields are nullable since the dataType of 
TimeWindow defines them
-          // as nullable.
-          CreateNamedStruct(
-            Literal(WINDOW_START) ::
-              PreciseTimestampConversion(windowStart, LongType, 
dataType).castNullable() ::
-              Literal(WINDOW_END) ::
-              PreciseTimestampConversion(windowEnd, LongType, 
dataType).castNullable() ::
-              Nil)
-        }
-
-        val windowAttr = AttributeReference(
-          WINDOW_COL_NAME, window.dataType, metadata = metadata)()
-
-        if (window.windowDuration == window.slideDuration) {
-          val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), 
WINDOW_COL_NAME)(
-            exprId = windowAttr.exprId, explicitMetadata = Some(metadata))
-
-          val replacedPlan = p transformExpressions {
-            case t: TimeWindow => windowAttr
-          }
-
-          // For backwards compatibility we add a filter to filter out nulls
-          val filterExpr = IsNotNull(window.timeColumn)
-
-          replacedPlan.withNewChildren(
-            Project(windowStruct +: child.output,
-              Filter(filterExpr, child)) :: Nil)
-        } else {
-          val overlappingWindows =
-            math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
-          val windows =
-            Seq.tabulate(overlappingWindows)(i =>
-              getWindow(i, window.timeColumn.dataType))
-
-          val projections = windows.map(_ +: child.output)
-
-          // When the condition windowDuration % slideDuration = 0 is 
fulfilled,
-          // the estimation of the number of windows becomes exact one,
-          // which means all produced windows are valid.
-          val filterExpr =
-            if (window.windowDuration % window.slideDuration == 0) {
-              IsNotNull(window.timeColumn)
-            } else {
-              window.timeColumn >= windowAttr.getField(WINDOW_START) &&
-                window.timeColumn < windowAttr.getField(WINDOW_END)
-            }
-
-          val substitutedPlan = Filter(filterExpr,
-            Expand(projections, windowAttr +: child.output, child))
-
-          val renamedPlan = p transformExpressions {
-            case t: TimeWindow => windowAttr
-          }
-
-          renamedPlan.withNewChildren(substitutedPlan :: Nil)
-        }
-      } else if (numWindowExpr > 1) {
-        throw 
QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
-      } else {
-        p // Return unchanged. Analyzer will throw exception later
-      }
-  }
-}
-
-/** Maps a time column to a session window. */
-object SessionWindowing extends Rule[LogicalPlan] {
-  import org.apache.spark.sql.catalyst.dsl.expressions._
-
-  private final val SESSION_COL_NAME = "session_window"
-  private final val SESSION_START = "start"
-  private final val SESSION_END = "end"
-
-  /**
-   * Generates the logical plan for generating session window on a timestamp 
column.
-   * Each session window is initially defined as [timestamp, timestamp + gap).
-   *
-   * This also adds a marker to the session column so that downstream can 
easily find the column
-   * on session window.
-   */
-  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-    case p: LogicalPlan if p.children.size == 1 =>
-      val child = p.children.head
-      val sessionExpressions =
-        p.expressions.flatMap(_.collect { case s: SessionWindow => s }).toSet
-
-      val numWindowExpr = p.expressions.flatMap(_.collect {
-        case s: SessionWindow => s
-        case t: TimeWindow => t
-      }).toSet.size
-
-      // Only support a single session expression for now
-      if (numWindowExpr == 1 && sessionExpressions.nonEmpty &&
-          sessionExpressions.head.timeColumn.resolved &&
-          sessionExpressions.head.checkInputDataTypes().isSuccess) {
-
-        val session = sessionExpressions.head
-
-        val metadata = session.timeColumn match {
-          case a: Attribute => a.metadata
-          case _ => Metadata.empty
-        }
-
-        val newMetadata = new MetadataBuilder()
-          .withMetadata(metadata)
-          .putBoolean(SessionWindow.marker, true)
-          .build()
-
-        val sessionAttr = AttributeReference(
-          SESSION_COL_NAME, session.dataType, metadata = newMetadata)()
-
-        val sessionStart =
-          PreciseTimestampConversion(session.timeColumn, 
session.timeColumn.dataType, LongType)
-        val gapDuration = session.gapDuration match {
-          case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
-            Cast(expr, CalendarIntervalType)
-          case other =>
-            throw 
QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
-        }
-        val sessionEnd = PreciseTimestampConversion(session.timeColumn + 
gapDuration,
-          session.timeColumn.dataType, LongType)
-
-        // We make sure value fields are nullable since the dataType of 
SessionWindow defines them
-        // as nullable.
-        val literalSessionStruct = CreateNamedStruct(
-          Literal(SESSION_START) ::
-            PreciseTimestampConversion(sessionStart, LongType, 
session.timeColumn.dataType)
-              .castNullable() ::
-            Literal(SESSION_END) ::
-            PreciseTimestampConversion(sessionEnd, LongType, 
session.timeColumn.dataType)
-              .castNullable() ::
-            Nil)
-
-        val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
-          exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
-
-        val replacedPlan = p transformExpressions {
-          case s: SessionWindow => sessionAttr
-        }
-
-        val filterByTimeRange = session.gapDuration match {
-          case Literal(interval: CalendarInterval, CalendarIntervalType) =>
-            interval == null || interval.months + interval.days + 
interval.microseconds <= 0
-          case _ => true
-        }
-
-        // As same as tumbling window, we add a filter to filter out nulls.
-        // And we also filter out events with negative or zero or invalid gap 
duration.
-        val filterExpr = if (filterByTimeRange) {
-          IsNotNull(session.timeColumn) &&
-            (sessionAttr.getField(SESSION_END) > 
sessionAttr.getField(SESSION_START))
-        } else {
-          IsNotNull(session.timeColumn)
-        }
-
-        replacedPlan.withNewChildren(
-          Filter(filterExpr,
-            Project(sessionStruct +: child.output, child)) :: Nil)
-      } else if (numWindowExpr > 1) {
-        throw 
QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
-      } else {
-        p // Return unchanged. Analyzer will throw exception later
-      }
-  }
-}
-
 /**
  * Resolve expressions if they contains [[NamePlaceholder]]s.
  */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index ef8ce3f48d5..f5e494e9096 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -639,6 +639,7 @@ object FunctionRegistry {
     expression[Year]("year"),
     expression[TimeWindow]("window"),
     expression[SessionWindow]("session_window"),
+    expression[WindowTime]("window_time"),
     expression[MakeDate]("make_date"),
     expression[MakeTimestamp]("make_timestamp"),
     // We keep the 2 expression builders below to have different function docs.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
new file mode 100644
index 00000000000..fd5da3ff13d
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, 
IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, 
TimeWindow, WindowTime}
+import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.TIME_WINDOW
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, 
Metadata, MetadataBuilder, StructType}
+import org.apache.spark.unsafe.types.CalendarInterval
+
+/**
+ * Maps a time column to multiple time windows using the Expand operator. 
Since it's non-trivial to
+ * figure out how many windows a time column can map to, we over-estimate the 
number of windows and
+ * filter out the rows where the time column is not inside the time window.
+ */
+object TimeWindowing extends Rule[LogicalPlan] {
+  import org.apache.spark.sql.catalyst.dsl.expressions._
+
+  private final val WINDOW_COL_NAME = "window"
+  private final val WINDOW_START = "start"
+  private final val WINDOW_END = "end"
+
+  /**
+   * Generates the logical plan for generating window ranges on a timestamp 
column. Without
+   * knowing what the timestamp value is, it's non-trivial to figure out 
deterministically how many
+   * window ranges a timestamp will map to given all possible combinations of 
a window duration,
+   * slide duration and start time (offset). Therefore, we express and 
over-estimate the number of
+   * windows there may be, and filter the valid windows. We use last Project 
operator to group
+   * the window columns into a struct so they can be accessed as 
`window.start` and `window.end`.
+   *
+   * The windows are calculated as below:
+   * maxNumOverlapping <- ceil(windowDuration / slideDuration)
+   * for (i <- 0 until maxNumOverlapping)
+   *   lastStart <- timestamp - (timestamp - startTime + slideDuration) % 
slideDuration
+   *   windowStart <- lastStart - i * slideDuration
+   *   windowEnd <- windowStart + windowDuration
+   *   return windowStart, windowEnd
+   *
+   * This behaves as follows for the given parameters for the time: 12:05. The 
valid windows are
+   * marked with a +, and invalid ones are marked with a x. The invalid ones 
are filtered using the
+   * Filter operator.
+   * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
+   *     11:55 - 12:07 +                      11:52 - 12:04 x
+   *     12:00 - 12:12 +                      11:57 - 12:09 +
+   *     12:05 - 12:17 +                      12:02 - 12:14 +
+   *
+   * @param plan The logical plan
+   * @return the logical plan that will generate the time windows using the 
Expand operator, with
+   *         the Filter operator for correctness and Project for usability.
+   */
+  def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+    _.containsPattern(TIME_WINDOW), ruleId) {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowExpressions =
+        p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet
+
+      val numWindowExpr = p.expressions.flatMap(_.collect {
+        case s: SessionWindow => s
+        case t: TimeWindow => t
+      }).toSet.size
+
+      // Only support a single window expression for now
+      if (numWindowExpr == 1 && windowExpressions.nonEmpty &&
+        windowExpressions.head.timeColumn.resolved &&
+        windowExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val window = windowExpressions.head
+
+        if (StructType.acceptsType(window.timeColumn.dataType)) {
+          return p.transformExpressions {
+            case t: TimeWindow => t.copy(timeColumn = 
WindowTime(window.timeColumn))
+          }
+        }
+
+        val metadata = window.timeColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        val newMetadata = new MetadataBuilder()
+          .withMetadata(metadata)
+          .putBoolean(TimeWindow.marker, true)
+          .build()
+
+        def getWindow(i: Int, dataType: DataType): Expression = {
+          val timestamp = PreciseTimestampConversion(window.timeColumn, 
dataType, LongType)
+          val lastStart = timestamp - (timestamp - window.startTime
+            + window.slideDuration) % window.slideDuration
+          val windowStart = lastStart - i * window.slideDuration
+          val windowEnd = windowStart + window.windowDuration
+
+          // We make sure value fields are nullable since the dataType of 
TimeWindow defines them
+          // as nullable.
+          CreateNamedStruct(
+            Literal(WINDOW_START) ::
+              PreciseTimestampConversion(windowStart, LongType, 
dataType).castNullable() ::
+              Literal(WINDOW_END) ::
+              PreciseTimestampConversion(windowEnd, LongType, 
dataType).castNullable() ::
+              Nil)
+        }
+
+        val windowAttr = AttributeReference(
+          WINDOW_COL_NAME, window.dataType, metadata = newMetadata)()
+
+        if (window.windowDuration == window.slideDuration) {
+          val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), 
WINDOW_COL_NAME)(
+            exprId = windowAttr.exprId, explicitMetadata = Some(newMetadata))
+
+          val replacedPlan = p transformExpressions {
+            case t: TimeWindow => windowAttr
+          }
+
+          // For backwards compatibility we add a filter to filter out nulls
+          val filterExpr = IsNotNull(window.timeColumn)
+
+          replacedPlan.withNewChildren(
+            Project(windowStruct +: child.output,
+              Filter(filterExpr, child)) :: Nil)
+        } else {
+          val overlappingWindows =
+            math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
+          val windows =
+            Seq.tabulate(overlappingWindows)(i =>
+              getWindow(i, window.timeColumn.dataType))
+
+          val projections = windows.map(_ +: child.output)
+
+          // When the condition windowDuration % slideDuration = 0 is 
fulfilled,
+          // the estimation of the number of windows becomes exact one,
+          // which means all produced windows are valid.
+          val filterExpr =
+          if (window.windowDuration % window.slideDuration == 0) {
+            IsNotNull(window.timeColumn)
+          } else {
+            window.timeColumn >= windowAttr.getField(WINDOW_START) &&
+              window.timeColumn < windowAttr.getField(WINDOW_END)
+          }
+
+          val substitutedPlan = Filter(filterExpr,
+            Expand(projections, windowAttr +: child.output, child))
+
+          val renamedPlan = p transformExpressions {
+            case t: TimeWindow => windowAttr
+          }
+
+          renamedPlan.withNewChildren(substitutedPlan :: Nil)
+        }
+      } else if (numWindowExpr > 1) {
+        throw 
QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
+      } else {
+        p // Return unchanged. Analyzer will throw exception later
+      }
+  }
+}
+
+/** Maps a time column to a session window. */
+object SessionWindowing extends Rule[LogicalPlan] {
+  import org.apache.spark.sql.catalyst.dsl.expressions._
+
+  private final val SESSION_COL_NAME = "session_window"
+  private final val SESSION_START = "start"
+  private final val SESSION_END = "end"
+
+  /**
+   * Generates the logical plan for generating session window on a timestamp 
column.
+   * Each session window is initially defined as [timestamp, timestamp + gap).
+   *
+   * This also adds a marker to the session column so that downstream can 
easily find the column
+   * on session window.
+   */
+  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val sessionExpressions =
+        p.expressions.flatMap(_.collect { case s: SessionWindow => s }).toSet
+
+      val numWindowExpr = p.expressions.flatMap(_.collect {
+        case s: SessionWindow => s
+        case t: TimeWindow => t
+      }).toSet.size
+
+      // Only support a single session expression for now
+      if (numWindowExpr == 1 && sessionExpressions.nonEmpty &&
+        sessionExpressions.head.timeColumn.resolved &&
+        sessionExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val session = sessionExpressions.head
+
+        if (StructType.acceptsType(session.timeColumn.dataType)) {
+          return p transformExpressions {
+            case t: SessionWindow => t.copy(timeColumn = 
WindowTime(session.timeColumn))
+          }
+        }
+
+        val metadata = session.timeColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        val newMetadata = new MetadataBuilder()
+          .withMetadata(metadata)
+          .putBoolean(SessionWindow.marker, true)
+          .build()
+
+        val sessionAttr = AttributeReference(
+          SESSION_COL_NAME, session.dataType, metadata = newMetadata)()
+
+        val sessionStart =
+          PreciseTimestampConversion(session.timeColumn, 
session.timeColumn.dataType, LongType)
+        val gapDuration = session.gapDuration match {
+          case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
+            Cast(expr, CalendarIntervalType)
+          case other =>
+            throw 
QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
+        }
+        val sessionEnd = PreciseTimestampConversion(session.timeColumn + 
gapDuration,
+          session.timeColumn.dataType, LongType)
+
+        // We make sure value fields are nullable since the dataType of 
SessionWindow defines them
+        // as nullable.
+        val literalSessionStruct = CreateNamedStruct(
+          Literal(SESSION_START) ::
+            PreciseTimestampConversion(sessionStart, LongType, 
session.timeColumn.dataType)
+              .castNullable() ::
+            Literal(SESSION_END) ::
+            PreciseTimestampConversion(sessionEnd, LongType, 
session.timeColumn.dataType)
+              .castNullable() ::
+            Nil)
+
+        val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
+          exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
+
+        val replacedPlan = p transformExpressions {
+          case s: SessionWindow => sessionAttr
+        }
+
+        val filterByTimeRange = session.gapDuration match {
+          case Literal(interval: CalendarInterval, CalendarIntervalType) =>
+            interval == null || interval.months + interval.days + 
interval.microseconds <= 0
+          case _ => true
+        }
+
+        // As same as tumbling window, we add a filter to filter out nulls.
+        // And we also filter out events with negative or zero or invalid gap 
duration.
+        val filterExpr = if (filterByTimeRange) {
+          IsNotNull(session.timeColumn) &&
+            (sessionAttr.getField(SESSION_END) > 
sessionAttr.getField(SESSION_START))
+        } else {
+          IsNotNull(session.timeColumn)
+        }
+
+        replacedPlan.withNewChildren(
+          Filter(filterExpr,
+            Project(sessionStruct +: child.output, child)) :: Nil)
+      } else if (numWindowExpr > 1) {
+        throw 
QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
+      } else {
+        p // Return unchanged. Analyzer will throw exception later
+      }
+  }
+}
+
+/**
+ * Resolves the window_time expression which extracts the correct window time 
from the
+ * window column generated as the output of the window aggregating operators. 
The
+ * window column is of type struct { start: TimestampType, end: TimestampType 
}.
+ * The correct representative event time of a window is ``window.end - 1``.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&
+        windowTimeExpressions.head.windowColumn.resolved &&
+        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val windowTime = windowTimeExpressions.head
+
+        val metadata = windowTime.windowColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        if (!metadata.contains(TimeWindow.marker) &&
+          !metadata.contains(SessionWindow.marker)) {
+          // FIXME: error framework?
+          throw new AnalysisException(
+            "The input is not a correct window column: $windowTime", plan = 
Some(p))
+        }
+
+        val newMetadata = new MetadataBuilder()
+          .withMetadata(metadata)
+          .remove(TimeWindow.marker)
+          .remove(SessionWindow.marker)
+          .build()
+
+        val attr = AttributeReference(
+          "window_time", windowTime.dataType, metadata = newMetadata)()
+
+        // NOTE: "window.end" is "exclusive" upper bound of window, so if we 
use this value as
+        // it is, it is going to be bound to the different window even if we 
apply the same window
+        // spec. Decrease 1 microsecond from window.end to let the window_time 
be bound to the
+        // correct window range.
+        val subtractExpr =
+        PreciseTimestampConversion(
+          Subtract(PreciseTimestampConversion(
+            GetStructField(windowTime.windowColumn, 1),
+            windowTime.dataType, LongType), Literal(1L)),
+          LongType,
+          windowTime.dataType)
+
+        val newColumn = Alias(subtractExpr, "window_time")(
+          exprId = attr.exprId, explicitMetadata = Some(newMetadata))
+
+        val replacedPlan = p transformExpressions {
+          case w: WindowTime => attr
+        }
+
+        replacedPlan.withNewChildren(Project(newColumn +: child.output, child) 
:: Nil)
+      } else {
+        p // Return unchanged. Analyzer will throw exception later
+      }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index d7deca2f7b7..53c79d1fd54 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -136,6 +136,8 @@ case class TimeWindow(
 }
 
 object TimeWindow {
+  val marker = "spark.timeWindow"
+
   /**
    * Parses the interval string for a valid time duration. CalendarInterval 
expects interval
    * strings to start with the string `interval`. For usability, we prepend 
`interval` to the string
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
new file mode 100644
index 00000000000..effc1506d74
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+  usage = """
+    _FUNC_(window_column) - Extract the time value from time/session window 
column which can be used for event time value of window.
+      The extracted time is (window.end - 1) which reflects the fact that the 
the aggregating
+      windows have exclusive upper bound - [start, end)
+      See <a 
href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time";>'Window
 Operations on Event Time'</a> in Structured Streaming guide doc for detailed 
explanation and examples.
+  """,
+  arguments = """
+    Arguments:
+      * window_column - The column representing time/session window.
+  """,
+  examples = """
+    Examples:
+      > SELECT a, window.start as start, window.end as end, _FUNC_(window), 
cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 
00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), 
('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') 
ORDER BY a, window.start);
+        A1     2021-01-01 00:00:00     2021-01-01 00:05:00     2021-01-01 
00:04:59.999999      2
+        A1     2021-01-01 00:05:00     2021-01-01 00:10:00     2021-01-01 
00:09:59.999999      1
+        A2     2021-01-01 00:00:00     2021-01-01 00:05:00     2021-01-01 
00:04:59.999999      1
+  """,
+  group = "datetime_funcs",
+  since = "3.3.0")
+// scalastyle:on line.size.limit line.contains.tab
+case class WindowTime(windowColumn: Expression)
+  extends UnaryExpression
+    with ImplicitCastInputTypes
+    with Unevaluable
+    with NonSQLExpression {
+
+  override def child: Expression = windowColumn
+  override def inputTypes: Seq[AbstractDataType] = Seq(StructType)
+
+  override def dataType: DataType = 
child.dataType.asInstanceOf[StructType].head.dataType
+
+  override def prettyName: String = "window_time"
+
+  // This expression is replaced in the analyzer.
+  override lazy val resolved = false
+
+  override protected def withNewChildInternal(newChild: Expression): 
WindowTime =
+    copy(windowColumn = newChild)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 620e1c60721..780bf925ad7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3777,6 +3777,23 @@ object functions {
     window(timeColumn, windowDuration, windowDuration, "0 second")
   }
 
+  /**
+   * Extracts the event time from the window column.
+   *
+   * The window column is of StructType { start: Timestamp, end: Timestamp } 
where start is
+   * inclusive and end is exclusive. Since event time can support microsecond 
precision,
+   * window_time(window) = window.end - 1 microsecond.
+   *
+   * @param windowColumn The window column (typically produced by window 
aggregation) of type
+   *                     StructType { start: Timestamp, end: Timestamp }
+   *
+   * @group datetime_funcs
+   * @since 3.3.0
+   */
+  def window_time(windowColumn: Column): Column = withExpr {
+    WindowTime(windowColumn.expr)
+  }
+
   /**
    * Generates session window given a timestamp specifying column.
    *
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 4ce4f1225ce..6f111b777a6 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -345,6 +345,7 @@
 | org.apache.spark.sql.catalyst.expressions.WeekDay | weekday | SELECT 
weekday('2009-07-30') | struct<weekday(2009-07-30):int> |
 | org.apache.spark.sql.catalyst.expressions.WeekOfYear | weekofyear | SELECT 
weekofyear('2008-02-20') | struct<weekofyear(2008-02-20):int> |
 | org.apache.spark.sql.catalyst.expressions.WidthBucket | width_bucket | 
SELECT width_bucket(5.3, 0.2, 10.6, 5) | struct<width_bucket(5.3, 0.2, 10.6, 
5):bigint> |
+| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT 
a, window.start as start, window.end as end, window_time(window), cnt FROM 
(SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), 
('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', 
'2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY 
a, window.start) | 
struct<a:string,start:timestamp,end:timestamp,window_time:timestamp,cnt:bigint> 
|
 | org.apache.spark.sql.catalyst.expressions.XxHash64 | xxhash64 | SELECT 
xxhash64('Spark', array(123), 2) | struct<xxhash64(Spark, array(123), 
2):bigint> |
 | org.apache.spark.sql.catalyst.expressions.Year | year | SELECT 
year('2016-07-30') | struct<year(2016-07-30):int> |
 | org.apache.spark.sql.catalyst.expressions.ZipWith | zip_with | SELECT 
zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)) | 
struct<zip_with(array(1, 2, 3), array(a, b, c), lambdafunction(named_struct(y, 
namedlambdavariable(), x, namedlambdavariable()), namedlambdavariable(), 
namedlambdavariable())):array<struct<y:string,x:int>>> |
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index bd39453f512..f775eb9ecfc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -575,4 +575,66 @@ class DataFrameTimeWindowingSuite extends QueryTest with 
SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    val e = intercept[AnalysisException] {
+      df
+        .withColumn("time2", expr("time - INTERVAL 5 minutes"))
+        .select(
+          window($"time", "10 seconds").as("window1"),
+          window($"time2", "10 seconds").as("window2")
+        )
+        .select(
+          $"window1.end".cast("string"),
+          window_time($"window1").cast("string"),
+          $"window2.end".cast("string"),
+          window_time($"window2").cast("string")
+        )
+    }
+    assert(e.getMessage.contains(
+      "Multiple time/session window expressions would result in a cartesian 
product of rows, " +
+        "therefore they are currently not supported"))
+  }
+
+  test("window_time function on agg output") {
+    val df = Seq(
+      ("2016-03-27 19:38:19", 1), ("2016-03-27 19:39:25", 2)
+    ).toDF("time", "value")
+    checkAnswer(
+      df.groupBy(window($"time", "10 seconds"))
+        .agg(count("*").as("counts"))
+        .orderBy($"window.start".asc)
+        .select(
+          $"window.start".cast("string"),
+          $"window.end".cast("string"),
+          window_time($"window").cast("string"),
+          $"counts"),
+      Seq(
+        Row("2016-03-27 19:38:10", "2016-03-27 19:38:20", "2016-03-27 
19:38:19.999999", 1),
+        Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", "2016-03-27 
19:39:29.999999", 1)
+      )
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to