This is an automated email from the ASF dual-hosted git repository.

dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 90d09027dcf7 [SPARK-57193][SQL] Refactor some helpers out of 
TimeWindowResolution
90d09027dcf7 is described below

commit 90d09027dcf78976caf6bba64136bf35579d13c3
Author: Vladimir Golubev <[email protected]>
AuthorDate: Mon Jun 1 09:46:12 2026 -0700

    [SPARK-57193][SQL] Refactor some helpers out of TimeWindowResolution
    
    ### What changes were proposed in this pull request?
    
    Refactor some helpers out of `TimeWindowResolution`.
    
    ### Why are the changes needed?
    
    To reuse in the single-pass Analyzer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude.
    
    Closes #56246 from 
vladimirg-db/vladimir-golubev_data/time-window-resolution-reusable-helpers.
    
    Authored-by: Vladimir Golubev <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../catalyst/analysis/TimeWindowResolution.scala   | 64 +++++++++++++---------
 1 file changed, 39 insertions(+), 25 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
index 8dbe6ed44d1c..63ff894c48d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
@@ -35,10 +35,10 @@ object TimeWindowResolution {
   final val WINDOW_COL_NAME = "window"
   final val SESSION_COL_NAME = "session_window"
 
-  private final val WINDOW_START = "start"
-  private final val WINDOW_END = "end"
-  private final val SESSION_START = "start"
-  private final val SESSION_END = "end"
+  final val WINDOW_START = "start"
+  final val WINDOW_END = "end"
+  final val SESSION_START = "start"
+  final val SESSION_END = "end"
 
   /**
    * Synthesizes the [[Project]]/[[Expand]]+[[Filter]] sub-plan for a resolved 
[[TimeWindow]] and
@@ -90,8 +90,7 @@ object TimeWindowResolution {
       Project(windowStruct +: child.output,
         Filter(filterExpr, child))
     } else {
-      val overlappingWindows =
-        math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
+      val overlappingWindows = overlappingWindowCount(window)
       val windows =
         Seq.tabulate(overlappingWindows)(i =>
           getWindow(i, window.timeColumn.dataType))
@@ -165,12 +164,7 @@ object TimeWindowResolution {
     val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
       exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
 
-    val filterByTimeRange = if (gapDuration.foldable) {
-      val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
-      interval == null || interval.months + interval.days + 
interval.microseconds <= 0
-    } else {
-      true
-    }
+    val filterByTimeRange = sessionFilterByTimeRange(gapDuration)
 
     // As same as tumbling window, we add a filter to filter out nulls.
     // And we also filter out events with negative or zero or invalid gap 
duration.
@@ -222,19 +216,7 @@ object TimeWindowResolution {
 
       val attr = AttributeReference(colName, windowTime.dataType, metadata = 
newMetadata)()
 
-      // NOTE: "window.end" is "exclusive" upper bound of window, so if we use 
this value as
-      // it is, it is going to be bound to the different window even if we 
apply the same window
-      // spec. Decrease 1 microsecond from window.end to let the window_time 
be bound to the
-      // correct window range.
-      val subtractExpr =
-      PreciseTimestampConversion(
-        Subtract(PreciseTimestampConversion(
-          GetStructField(windowTime.windowColumn, 1),
-          windowTime.dataType, LongType), Literal(1L)),
-        LongType,
-        windowTime.dataType)
-
-      val newColumn = Alias(subtractExpr, colName)(
+      val newColumn = Alias(windowTimeExtractionExpression(windowTime), 
colName)(
         exprId = attr.exprId, explicitMetadata = Some(newMetadata))
 
       windowTime -> (attr, newColumn)
@@ -250,4 +232,36 @@ object TimeWindowResolution {
 
     (windowTimeToAttr, newChild)
   }
+
+  /**
+   * Builds the expression extracting a [[WindowTime]]'s timestamp: the last 
microsecond of the
+   * source window (`window.end - 1us`). `window.end` is the exclusive upper 
bound, so using it
+   * as-is would bind the result to the next window under the same window spec.
+   */
+  def windowTimeExtractionExpression(windowTime: WindowTime): Expression =
+    PreciseTimestampConversion(
+      Subtract(
+        PreciseTimestampConversion(
+          GetStructField(windowTime.windowColumn, 1),
+          windowTime.dataType,
+          LongType),
+        Literal(1L)),
+      LongType,
+      windowTime.dataType)
+
+  /** Number of overlapping sliding windows a single row can fall into. */
+  def overlappingWindowCount(window: TimeWindow): Int =
+    math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
+
+  /**
+   * Whether the session-window filter must also drop empty windows (`end <= 
start`): true when the
+   * gap is non-foldable (unknown at plan time) or a foldable gap is null or 
non-positive.
+   */
+  def sessionFilterByTimeRange(gapDuration: Expression): Boolean =
+    if (gapDuration.foldable) {
+      val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
+      interval == null || interval.months + interval.days + 
interval.microseconds <= 0
+    } else {
+      true
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to