This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 56496815b218 [SPARK-56875][SQL] Refactor time/session window 
resolution code to be reused in the single-pass Analyzer
56496815b218 is described below

commit 56496815b2189a76c8a92e1714538c5f3c751c40
Author: Vladimir Golubev <[email protected]>
AuthorDate: Fri May 15 20:55:35 2026 +0800

    [SPARK-56875][SQL] Refactor time/session window resolution code to be 
reused in the single-pass Analyzer
    
    ### What changes were proposed in this pull request?
    
    Refactor time/session window resolution code to be reused in the 
single-pass Analyzer.
    
    ### Why are the changes needed?
    
    Unblocks single-pass Analyzer support for time/sessnon windowing.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing/new tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude.
    
    Closes #55895 from 
vladimirg-db/vladimir-golubev_data/time-windowing-refactor.
    
    Authored-by: Vladimir Golubev <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 7569ba6265597cee5a972fec65a8e68368512844)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/analysis/ResolveTimeWindows.scala | 219 +--------
 .../catalyst/analysis/TimeWindowResolution.scala   | 253 ++++++++++
 .../sql-tests/analyzer-results/time-window.sql.out | 525 +++++++++++++++++++++
 .../resources/sql-tests/inputs/time-window.sql     | 231 +++++++++
 .../sql-tests/results/time-window.sql.out          | 508 ++++++++++++++++++++
 5 files changed, 1537 insertions(+), 199 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index a8680d0a0181..0061ddc0dc1e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -17,15 +17,12 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.ExtendedAnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, 
GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, 
SessionWindow, Subtract, TimeWindow, WindowTime}
-import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.expressions.{SessionWindow, TimeWindow, 
WindowTime}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, 
TIME_WINDOW, WINDOW_TIME}
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, 
Metadata, MetadataBuilder, StructType}
-import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.sql.types.StructType
 
 /**
  * Maps a time column to multiple time windows using the Expand operator. 
Since it's non-trivial to
@@ -33,11 +30,6 @@ import org.apache.spark.unsafe.types.CalendarInterval
  * filter out the rows where the time column is not inside the time window.
  */
 object TimeWindowing extends Rule[LogicalPlan] {
-  import org.apache.spark.sql.catalyst.dsl.expressions._
-
-  private final val WINDOW_COL_NAME = "window"
-  private final val WINDOW_START = "start"
-  private final val WINDOW_END = "end"
 
   /**
    * Generates the logical plan for generating window ranges on a timestamp 
column. Without
@@ -93,80 +85,13 @@ object TimeWindowing extends Rule[LogicalPlan] {
             case t: TimeWindow => t.copy(timeColumn = 
WindowTime(window.timeColumn))
           }
         } else {
-          val metadata = window.timeColumn match {
-            case a: Attribute => a.metadata
-            case _ => Metadata.empty
-          }
-
-          val newMetadata = new MetadataBuilder()
-            .withMetadata(metadata)
-            .putBoolean(TimeWindow.marker, true)
-            .build()
-
-          def getWindow(i: Int, dataType: DataType): Expression = {
-            val timestamp = PreciseTimestampConversion(window.timeColumn, 
dataType, LongType)
-            val remainder = (timestamp - window.startTime) % 
window.slideDuration
-            val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0),
-              remainder + window.slideDuration)), Some(remainder))
-            val windowStart = lastStart - i * window.slideDuration
-            val windowEnd = windowStart + window.windowDuration
-
-            // We make sure value fields are nullable since the dataType of 
TimeWindow defines them
-            // as nullable.
-            CreateNamedStruct(
-              Literal(WINDOW_START) ::
-                PreciseTimestampConversion(windowStart, LongType, 
dataType).castNullable() ::
-                Literal(WINDOW_END) ::
-                PreciseTimestampConversion(windowEnd, LongType, 
dataType).castNullable() ::
-                Nil)
-          }
-
-          val windowAttr = AttributeReference(
-            WINDOW_COL_NAME, window.dataType, metadata = newMetadata)()
-
-          if (window.windowDuration == window.slideDuration) {
-            val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), 
WINDOW_COL_NAME)(
-              exprId = windowAttr.exprId, explicitMetadata = Some(newMetadata))
-
-            val replacedPlan = p transformExpressions {
-              case t: TimeWindow => windowAttr
-            }
-
-            // For backwards compatibility we add a filter to filter out nulls
-            val filterExpr = IsNotNull(window.timeColumn)
+          val (windowAttr, newChild) =
+            TimeWindowResolution.buildTimeWindowRewrite(window, child)
 
-            replacedPlan.withNewChildren(
-              Project(windowStruct +: child.output,
-                Filter(filterExpr, child)) :: Nil)
-          } else {
-            val overlappingWindows =
-              math.ceil(window.windowDuration * 1.0 / 
window.slideDuration).toInt
-            val windows =
-              Seq.tabulate(overlappingWindows)(i =>
-                getWindow(i, window.timeColumn.dataType))
-
-            val projections = windows.map(_ +: child.output)
-
-            // When the condition windowDuration % slideDuration = 0 is 
fulfilled,
-            // the estimation of the number of windows becomes exact one,
-            // which means all produced windows are valid.
-            val filterExpr =
-            if (window.windowDuration % window.slideDuration == 0) {
-              IsNotNull(window.timeColumn)
-            } else {
-              window.timeColumn >= windowAttr.getField(WINDOW_START) &&
-                window.timeColumn < windowAttr.getField(WINDOW_END)
-            }
-
-            val substitutedPlan = Filter(filterExpr,
-              Expand(projections, windowAttr +: child.output, child))
-
-            val renamedPlan = p transformExpressions {
-              case t: TimeWindow => windowAttr
-            }
-
-            renamedPlan.withNewChildren(substitutedPlan :: Nil)
+          val replacedPlan = p.transformExpressions {
+            case _: TimeWindow => windowAttr
           }
+          replacedPlan.withNewChildren(newChild :: Nil)
         }
       } else if (numWindowExpr > 1) {
         throw 
QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
@@ -178,11 +103,6 @@ object TimeWindowing extends Rule[LogicalPlan] {
 
 /** Maps a time column to a session window. */
 object SessionWindowing extends Rule[LogicalPlan] {
-  import org.apache.spark.sql.catalyst.dsl.expressions._
-
-  private final val SESSION_COL_NAME = "session_window"
-  private final val SESSION_START = "start"
-  private final val SESSION_END = "end"
 
   /**
    * Generates the logical plan for generating session window on a timestamp 
column.
@@ -211,73 +131,17 @@ object SessionWindowing extends Rule[LogicalPlan] {
         val session = sessionExpressions.head
 
         if (StructType.acceptsType(session.timeColumn.dataType)) {
-          p transformExpressions {
+          p.transformExpressions {
             case t: SessionWindow => t.copy(timeColumn = 
WindowTime(session.timeColumn))
           }
         } else {
-          val metadata = session.timeColumn match {
-            case a: Attribute => a.metadata
-            case _ => Metadata.empty
-          }
-
-          val newMetadata = new MetadataBuilder()
-            .withMetadata(metadata)
-            .putBoolean(SessionWindow.marker, true)
-            .build()
-
-          val sessionAttr = AttributeReference(
-            SESSION_COL_NAME, session.dataType, metadata = newMetadata)()
+          val (sessionAttr, newChild) =
+            TimeWindowResolution.buildSessionWindowRewrite(session, child)
 
-          val sessionStart =
-            PreciseTimestampConversion(session.timeColumn, 
session.timeColumn.dataType, LongType)
-          val gapDuration = session.gapDuration match {
-            case expr if expr.dataType == CalendarIntervalType =>
-              expr
-            case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
-              Cast(expr, CalendarIntervalType)
-            case other =>
-              throw 
QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
+          val replacedPlan = p.transformExpressions {
+            case _: SessionWindow => sessionAttr
           }
-          val sessionEnd = PreciseTimestampConversion(session.timeColumn + 
gapDuration,
-            session.timeColumn.dataType, LongType)
-
-          // We make sure value fields are nullable since the dataType of 
SessionWindow defines them
-          // as nullable.
-          val literalSessionStruct = CreateNamedStruct(
-            Literal(SESSION_START) ::
-              PreciseTimestampConversion(sessionStart, LongType, 
session.timeColumn.dataType)
-                .castNullable() ::
-              Literal(SESSION_END) ::
-              PreciseTimestampConversion(sessionEnd, LongType, 
session.timeColumn.dataType)
-                .castNullable() ::
-              Nil)
-
-          val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
-            exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
-
-          val replacedPlan = p transformExpressions {
-            case s: SessionWindow => sessionAttr
-          }
-
-          val filterByTimeRange = if (gapDuration.foldable) {
-            val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
-            interval == null || interval.months + interval.days + 
interval.microseconds <= 0
-          } else {
-            true
-          }
-
-          // As same as tumbling window, we add a filter to filter out nulls.
-          // And we also filter out events with negative or zero or invalid 
gap duration.
-          val filterExpr = if (filterByTimeRange) {
-            IsNotNull(session.timeColumn) &&
-              (sessionAttr.getField(SESSION_END) > 
sessionAttr.getField(SESSION_START))
-          } else {
-            IsNotNull(session.timeColumn)
-          }
-
-          replacedPlan.withNewChildren(
-            Filter(filterExpr,
-              Project(sessionStruct +: child.output, child)) :: Nil)
+          replacedPlan.withNewChildren(newChild :: Nil)
         }
       } else if (numWindowExpr > 1) {
         throw 
QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
@@ -292,7 +156,7 @@ object SessionWindowing extends Rule[LogicalPlan] {
  * window column generated as the output of the window aggregating operators. 
The
  * window column is of type struct { start: TimestampType, end: TimestampType 
}.
  * The correct representative event time of a window is ``window.end - 1``.
- * */
+ */
 object ResolveWindowTime extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
     _.containsPattern(WINDOW_TIME), ruleId) {
@@ -306,56 +170,13 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
       }
 
       if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) {
-        val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { 
windowTime =>
-          val metadata = windowTime.windowColumn match {
-            case a: Attribute => a.metadata
-            case _ => Metadata.empty
-          }
-
-          if (!metadata.contains(TimeWindow.marker) &&
-            !metadata.contains(SessionWindow.marker)) {
-            throw new ExtendedAnalysisException(
-              new AnalysisException(
-                errorClass = "_LEGACY_ERROR_TEMP_3101",
-                messageParameters = Map("windowTime" -> windowTime.toString)),
-              plan = p)
-          }
-
-          val newMetadata = new MetadataBuilder()
-            .withMetadata(metadata)
-            .remove(TimeWindow.marker)
-            .remove(SessionWindow.marker)
-            .build()
-
-          val colName = windowTime.sql
+        val (windowTimeToAttr, newChild) =
+          TimeWindowResolution.buildWindowTimeRewrite(windowTimeExpressions, 
child, p)
 
-          val attr = AttributeReference(colName, windowTime.dataType, metadata 
= newMetadata)()
-
-          // NOTE: "window.end" is "exclusive" upper bound of window, so if we 
use this value as
-          // it is, it is going to be bound to the different window even if we 
apply the same window
-          // spec. Decrease 1 microsecond from window.end to let the 
window_time be bound to the
-          // correct window range.
-          val subtractExpr =
-          PreciseTimestampConversion(
-            Subtract(PreciseTimestampConversion(
-              GetStructField(windowTime.windowColumn, 1),
-              windowTime.dataType, LongType), Literal(1L)),
-            LongType,
-            windowTime.dataType)
-
-          val newColumn = Alias(subtractExpr, colName)(
-            exprId = attr.exprId, explicitMetadata = Some(newMetadata))
-
-          windowTime -> (attr, newColumn)
-        }.toMap
-
-        val replacedPlan = p transformExpressions {
-          case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1
+        val replacedPlan = p.transformExpressions {
+          case w: WindowTime => windowTimeToAttr(w)
         }
-
-        val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2)
-        replacedPlan.withNewChildren(
-          Project(newColumnsToAdd ++: child.output, child) :: Nil)
+        replacedPlan.withNewChildren(newChild :: Nil)
       } else {
         p // Return unchanged. Analyzer will throw exception later
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
new file mode 100644
index 000000000000..8dbe6ed44d1c
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.ExtendedAnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, 
GetStructField, IsNotNull, LessThan, Literal, NamedExpression, 
PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
+import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, 
LogicalPlan, Project}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, 
Metadata, MetadataBuilder}
+import org.apache.spark.unsafe.types.CalendarInterval
+
+/**
+ * Plan-rewrite helpers for resolving [[TimeWindow]], [[SessionWindow]] and 
[[WindowTime]]
+ * expressions. The bodies live here so that the synthesis logic can be reused 
outside of a
+ * [[org.apache.spark.sql.catalyst.rules.Rule]] traversal.
+ */
+object TimeWindowResolution {
+
+  final val WINDOW_COL_NAME = "window"
+  final val SESSION_COL_NAME = "session_window"
+
+  private final val WINDOW_START = "start"
+  private final val WINDOW_END = "end"
+  private final val SESSION_START = "start"
+  private final val SESSION_END = "end"
+
+  /**
+   * Synthesizes the [[Project]]/[[Expand]]+[[Filter]] sub-plan for a resolved 
[[TimeWindow]] and
+   * returns the window [[AttributeReference]] together with the new sub-plan 
that wraps `child`.
+   */
+  def buildTimeWindowRewrite(
+      window: TimeWindow,
+      child: LogicalPlan): (AttributeReference, LogicalPlan) = {
+    import org.apache.spark.sql.catalyst.dsl.expressions._
+
+    val metadata = window.timeColumn match {
+      case a: Attribute => a.metadata
+      case _ => Metadata.empty
+    }
+
+    val newMetadata = new MetadataBuilder()
+      .withMetadata(metadata)
+      .putBoolean(TimeWindow.marker, true)
+      .build()
+
+    def getWindow(i: Int, dataType: DataType): Expression = {
+      val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, 
LongType)
+      val remainder = (timestamp - window.startTime) % window.slideDuration
+      val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0),
+        remainder + window.slideDuration)), Some(remainder))
+      val windowStart = lastStart - i * window.slideDuration
+      val windowEnd = windowStart + window.windowDuration
+
+      // We make sure value fields are nullable since the dataType of 
TimeWindow defines them
+      // as nullable.
+      CreateNamedStruct(
+        Literal(WINDOW_START) ::
+          PreciseTimestampConversion(windowStart, LongType, 
dataType).castNullable() ::
+          Literal(WINDOW_END) ::
+          PreciseTimestampConversion(windowEnd, LongType, 
dataType).castNullable() ::
+          Nil)
+    }
+
+    val windowAttr = AttributeReference(
+      WINDOW_COL_NAME, window.dataType, metadata = newMetadata)()
+
+    val newChild = if (window.windowDuration == window.slideDuration) {
+      val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), 
WINDOW_COL_NAME)(
+        exprId = windowAttr.exprId, explicitMetadata = Some(newMetadata))
+
+      // For backwards compatibility we add a filter to filter out nulls
+      val filterExpr = IsNotNull(window.timeColumn)
+
+      Project(windowStruct +: child.output,
+        Filter(filterExpr, child))
+    } else {
+      val overlappingWindows =
+        math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
+      val windows =
+        Seq.tabulate(overlappingWindows)(i =>
+          getWindow(i, window.timeColumn.dataType))
+
+      val projections = windows.map(_ +: child.output)
+
+      // When the condition windowDuration % slideDuration = 0 is fulfilled,
+      // the estimation of the number of windows becomes exact one,
+      // which means all produced windows are valid.
+      val filterExpr =
+      if (window.windowDuration % window.slideDuration == 0) {
+        IsNotNull(window.timeColumn)
+      } else {
+        window.timeColumn >= windowAttr.getField(WINDOW_START) &&
+          window.timeColumn < windowAttr.getField(WINDOW_END)
+      }
+
+      Filter(filterExpr,
+        Expand(projections, windowAttr +: child.output, child))
+    }
+
+    (windowAttr, newChild)
+  }
+
+  /**
+   * Synthesizes the [[Project]]+[[Filter]] sub-plan for a resolved 
[[SessionWindow]] and returns
+   * the session window [[AttributeReference]] together with the new sub-plan 
that wraps `child`.
+   */
+  def buildSessionWindowRewrite(
+      session: SessionWindow,
+      child: LogicalPlan): (AttributeReference, LogicalPlan) = {
+    import org.apache.spark.sql.catalyst.dsl.expressions._
+
+    val metadata = session.timeColumn match {
+      case a: Attribute => a.metadata
+      case _ => Metadata.empty
+    }
+
+    val newMetadata = new MetadataBuilder()
+      .withMetadata(metadata)
+      .putBoolean(SessionWindow.marker, true)
+      .build()
+
+    val sessionAttr = AttributeReference(
+      SESSION_COL_NAME, session.dataType, metadata = newMetadata)()
+
+    val sessionStart =
+      PreciseTimestampConversion(session.timeColumn, 
session.timeColumn.dataType, LongType)
+    val gapDuration = session.gapDuration match {
+      case expr if expr.dataType == CalendarIntervalType =>
+        expr
+      case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
+        Cast(expr, CalendarIntervalType)
+      case other =>
+        throw 
QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
+    }
+    val sessionEnd = PreciseTimestampConversion(session.timeColumn + 
gapDuration,
+      session.timeColumn.dataType, LongType)
+
+    // We make sure value fields are nullable since the dataType of 
SessionWindow defines them
+    // as nullable.
+    val literalSessionStruct = CreateNamedStruct(
+      Literal(SESSION_START) ::
+        PreciseTimestampConversion(sessionStart, LongType, 
session.timeColumn.dataType)
+          .castNullable() ::
+        Literal(SESSION_END) ::
+        PreciseTimestampConversion(sessionEnd, LongType, 
session.timeColumn.dataType)
+          .castNullable() ::
+        Nil)
+
+    val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
+      exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
+
+    val filterByTimeRange = if (gapDuration.foldable) {
+      val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
+      interval == null || interval.months + interval.days + 
interval.microseconds <= 0
+    } else {
+      true
+    }
+
+    // As same as tumbling window, we add a filter to filter out nulls.
+    // And we also filter out events with negative or zero or invalid gap 
duration.
+    val filterExpr = if (filterByTimeRange) {
+      IsNotNull(session.timeColumn) &&
+        (sessionAttr.getField(SESSION_END) > 
sessionAttr.getField(SESSION_START))
+    } else {
+      IsNotNull(session.timeColumn)
+    }
+
+    val newChild = Filter(filterExpr,
+      Project(sessionStruct +: child.output, child))
+
+    (sessionAttr, newChild)
+  }
+
+  /**
+   * Synthesizes a [[Project]] sub-plan that exposes one extracted-time column 
per resolved
+   * [[WindowTime]] in `windowTimeExpressions`, wrapping `child`. Returns the 
per-[[WindowTime]]
+   * [[AttributeReference]] map and the new sub-plan. `errorContext` is the 
plan attached to the
+   * [[ExtendedAnalysisException]] thrown when a window-column metadata marker 
is missing.
+   */
+  def buildWindowTimeRewrite(
+      windowTimeExpressions: Set[WindowTime],
+      child: LogicalPlan,
+      errorContext: LogicalPlan): (Map[WindowTime, AttributeReference], 
LogicalPlan) = {
+    val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime 
=>
+      val metadata = windowTime.windowColumn match {
+        case a: Attribute => a.metadata
+        case _ => Metadata.empty
+      }
+
+      if (!metadata.contains(TimeWindow.marker) &&
+        !metadata.contains(SessionWindow.marker)) {
+        throw new ExtendedAnalysisException(
+          new AnalysisException(
+            errorClass = "_LEGACY_ERROR_TEMP_3101",
+            messageParameters = Map("windowTime" -> windowTime.toString)),
+          plan = errorContext)
+      }
+
+      val newMetadata = new MetadataBuilder()
+        .withMetadata(metadata)
+        .remove(TimeWindow.marker)
+        .remove(SessionWindow.marker)
+        .build()
+
+      val colName = windowTime.sql
+
+      val attr = AttributeReference(colName, windowTime.dataType, metadata = 
newMetadata)()
+
+      // NOTE: "window.end" is "exclusive" upper bound of window, so if we use 
this value as
+      // it is, it is going to be bound to the different window even if we 
apply the same window
+      // spec. Decrease 1 microsecond from window.end to let the window_time 
be bound to the
+      // correct window range.
+      val subtractExpr =
+      PreciseTimestampConversion(
+        Subtract(PreciseTimestampConversion(
+          GetStructField(windowTime.windowColumn, 1),
+          windowTime.dataType, LongType), Literal(1L)),
+        LongType,
+        windowTime.dataType)
+
+      val newColumn = Alias(subtractExpr, colName)(
+        exprId = attr.exprId, explicitMetadata = Some(newMetadata))
+
+      windowTime -> (attr, newColumn)
+    }.toMap
+
+    val newColumns: Seq[NamedExpression] =
+      windowTimeToAttrAndNewColumn.values.map(_._2).toSeq
+    val newChild = Project(newColumns ++ child.output, child)
+
+    val windowTimeToAttr = windowTimeToAttrAndNewColumn.map {
+      case (windowTime, (attr, _)) => windowTime -> attr
+    }
+
+    (windowTimeToAttr, newChild)
+  }
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/time-window.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/time-window.sql.out
new file mode 100644
index 000000000000..a6b88d42e58e
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/time-window.sql.out
@@ -0,0 +1,525 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW ts_data AS
+SELECT * FROM VALUES
+  ('A1', CAST('2021-01-01 00:00:00' AS TIMESTAMP)),
+  ('A1', CAST('2021-01-01 00:04:30' AS TIMESTAMP)),
+  ('A1', CAST('2021-01-01 00:06:00' AS TIMESTAMP)),
+  ('A2', CAST('2021-01-01 00:01:00' AS TIMESTAMP))
+AS tab(a, ts)
+-- !query analysis
+CreateViewCommand `ts_data`, SELECT * FROM VALUES
+  ('A1', CAST('2021-01-01 00:00:00' AS TIMESTAMP)),
+  ('A1', CAST('2021-01-01 00:04:30' AS TIMESTAMP)),
+  ('A1', CAST('2021-01-01 00:06:00' AS TIMESTAMP)),
+  ('A2', CAST('2021-01-01 00:01:00' AS TIMESTAMP))
+AS tab(a, ts), false, true, LocalTempView, UNSUPPORTED, true
+   +- Project [a#x, ts#x]
+      +- SubqueryAlias tab
+         +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS 
start#x, count(1) AS cnt#xL]
+   +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x, 
a#x, null, 1], [window#x, a#x, ts#x, null, null, 3]], [window#x, a#x, ts#x, 
a#x, window#x, spark_grouping_id#xL]
+      +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+         +- Project [named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000) 
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(p [...]
+            +- Filter isnotnull(ts#x)
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS 
start#x, count(1) AS cnt#xL]
+   +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x, 
a#x, null, 1], [window#x, a#x, ts#x, null, window#x, 2], [window#x, a#x, ts#x, 
null, null, 3]], [window#x, a#x, ts#x, a#x, window#x, spark_grouping_id#xL]
+      +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+         +- Project [named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000) 
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(p [...]
+            +- Filter isnotnull(ts#x)
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes')), (a))
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS 
start#x, count(1) AS cnt#xL]
+   +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x, 
a#x, null, 1]], [window#x, a#x, ts#x, a#x, window#x, spark_grouping_id#xL]
+      +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+         +- Project [named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000) 
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(p [...]
+            +- Filter isnotnull(ts#x)
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt
+-- !query analysis
+Sort [cnt#xL ASC NULLS FIRST], true
++- Aggregate [concat(cast(window#x.start as string), a#x)], [count(1) AS 
cnt#xL]
+   +- Project [named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000) 
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 
0) % 300000000) END) - 0), LongType, TimestampType)), end, 
knownnullable(precise [...]
+      +- Filter isnotnull(ts#x)
+         +- SubqueryAlias ts_data
+            +- View (`ts_data`, [a#x, ts#x])
+               +- Project [cast(a#x as string) AS a#x, cast(ts#x as timestamp) 
AS ts#x]
+                  +- Project [a#x, ts#x]
+                     +- SubqueryAlias tab
+                        +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Filter (cnt#xL > cast(1 as bigint))
+   +- Aggregate [window#x], [window#x.start AS start#x, count(1) AS cnt#xL]
+      +- Project [named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000) 
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(prec 
[...]
+         +- Filter isnotnull(ts#x)
+            +- SubqueryAlias ts_data
+               +- View (`ts_data`, [a#x, ts#x])
+                  +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                     +- Project [a#x, ts#x]
+                        +- SubqueryAlias tab
+                           +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT * FROM (
+  SELECT
+    a,
+    window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, window(ts, '5 minutes')
+) sub
+ORDER BY a, ws
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, ws#x ASC NULLS FIRST], true
++- Project [a#x, ws#x, cnt#xL]
+   +- SubqueryAlias sub
+      +- Aggregate [a#x, window#x], [a#x, window#x.start AS ws#x, count(1) AS 
cnt#xL]
+         +- Project [named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000) 
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(p [...]
+            +- Filter isnotnull(ts#x)
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  window.start,
+  window.end,
+  count(*) AS counts
+FROM VALUES
+  (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds')
+ORDER BY window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [window#x], [window#x.start AS start#x, window#x.end AS end#x, 
count(1) AS counts#xL]
+   +- Project [named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(time#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(time#x, 
TimestampType, LongType) - 0) % 10000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(time#x, TimestampType, LongType) - 0) % 10000000) 
+ 10000000) ELSE ((precisetimestampconversion(time#x, TimestampType, LongType) 
- 0) % 10000000) END) - 0), LongType, TimestampType)), end, knownnullable(pre 
[...]
+      +- Filter isnotnull(time#x)
+         +- SubqueryAlias tab
+            +- LocalRelation [time#x, value#x, id#x]
+
+
+-- !query
+SELECT
+  window.start,
+  window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes')
+ORDER BY start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [window#x], [window#x.start AS start#x, window#x.end AS end#x, 
count(1) AS cnt#xL]
+   +- Project [named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000) 
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 
0) % 300000000) END) - 0), LongType, TimestampType)), end, 
knownnullable(precise [...]
+      +- Filter isnotnull(ts#x)
+         +- SubqueryAlias tab
+            +- LocalRelation [ts#x]
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH ROLLUP
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS 
start#x, count(1) AS cnt#xL]
+   +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x, 
a#x, null, 1], [window#x, a#x, ts#x, null, null, 3]], [window#x, a#x, ts#x, 
a#x, window#x, spark_grouping_id#xL]
+      +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+         +- Filter isnotnull(ts#x)
+            +- Expand [[named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) + 
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) 
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(pr [...]
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH CUBE
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS 
start#x, count(1) AS cnt#xL]
+   +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x, 
a#x, null, 1], [window#x, a#x, ts#x, null, window#x, 2], [window#x, a#x, ts#x, 
null, null, 3]], [window#x, a#x, ts#x, a#x, window#x, spark_grouping_id#xL]
+      +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+         +- Filter isnotnull(ts#x)
+            +- Expand [[named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) + 
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) 
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(pr [...]
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes', '1 minute')), (a))
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS 
start#x, count(1) AS cnt#xL]
+   +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x, 
a#x, null, 1]], [window#x, a#x, ts#x, a#x, window#x, spark_grouping_id#xL]
+      +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+         +- Filter isnotnull(ts#x)
+            +- Expand [[named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) + 
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) 
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(pr [...]
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes', '1 minute').start AS string), a)
+ORDER BY cnt
+-- !query analysis
+Sort [cnt#xL ASC NULLS FIRST], true
++- Aggregate [concat(cast(window#x.start as string), a#x)], [count(1) AS 
cnt#xL]
+   +- Filter isnotnull(ts#x)
+      +- Expand [[named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) + 
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) 
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(preciset 
[...]
+         +- SubqueryAlias ts_data
+            +- View (`ts_data`, [a#x, ts#x])
+               +- Project [cast(a#x as string) AS a#x, cast(ts#x as timestamp) 
AS ts#x]
+                  +- Project [a#x, ts#x]
+                     +- SubqueryAlias tab
+                        +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes', '1 minute')
+HAVING count(*) > 1
+ORDER BY window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Filter (cnt#xL > cast(1 as bigint))
+   +- Aggregate [window#x], [window#x.start AS start#x, count(1) AS cnt#xL]
+      +- Filter isnotnull(ts#x)
+         +- Expand [[named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) + 
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) 
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(preci [...]
+            +- SubqueryAlias ts_data
+               +- View (`ts_data`, [a#x, ts#x])
+                  +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                     +- Project [a#x, ts#x]
+                        +- SubqueryAlias tab
+                           +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT * FROM (
+  SELECT
+    a,
+    window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, window(ts, '5 minutes', '1 minute')
+) sub
+ORDER BY a, ws
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, ws#x ASC NULLS FIRST], true
++- Project [a#x, ws#x, cnt#xL]
+   +- SubqueryAlias sub
+      +- Aggregate [a#x, window#x], [a#x, window#x.start AS ws#x, count(1) AS 
cnt#xL]
+         +- Filter isnotnull(ts#x)
+            +- Expand [[named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) + 
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) 
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(pr [...]
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  window.start,
+  window.end,
+  count(*) AS counts
+FROM VALUES
+  (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds', '5 seconds')
+ORDER BY window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [window#x], [window#x.start AS start#x, window#x.end AS end#x, 
count(1) AS counts#xL]
+   +- Filter isnotnull(time#x)
+      +- Expand [[named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(time#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(time#x, 
TimestampType, LongType) - 0) % 5000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(time#x, TimestampType, LongType) - 0) % 5000000) 
+ 5000000) ELSE ((precisetimestampconversion(time#x, TimestampType, LongType) - 
0) % 5000000) END) - 0), LongType, TimestampType)), end, knownnullable(prec 
[...]
+         +- SubqueryAlias tab
+            +- LocalRelation [time#x, value#x, id#x]
+
+
+-- !query
+SELECT
+  window.start,
+  window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes', '1 minute')
+ORDER BY start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [window#x], [window#x.start AS start#x, window#x.end AS end#x, 
count(1) AS cnt#xL]
+   +- Filter isnotnull(ts#x)
+      +- Expand [[named_struct(start, 
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x, 
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x, 
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN 
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) + 
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) 
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(preciset 
[...]
+         +- SubqueryAlias tab
+            +- LocalRelation [ts#x]
+
+
+-- !query
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, session_window#x, spark_grouping_id#xL], [a#x, 
session_window#x.start AS start#x, count(1) AS cnt#xL]
+   +- Expand [[session_window#x, a#x, ts#x, a#x, session_window#x, 0], 
[session_window#x, a#x, ts#x, a#x, null, 1], [session_window#x, a#x, ts#x, 
null, null, 3]], [session_window#x, a#x, ts#x, a#x, session_window#x, 
spark_grouping_id#xL]
+      +- Project [session_window#x, a#x, ts#x, a#x AS a#x, session_window#x AS 
session_window#x]
+         +- Filter isnotnull(ts#x)
+            +- Project [named_struct(start, 
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType, 
LongType), LongType, TimestampType), end, 
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x + 
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType, 
TimestampType))) AS session_window#x, a#x, ts#x]
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, session_window#x, spark_grouping_id#xL], [a#x, 
session_window#x.start AS start#x, count(1) AS cnt#xL]
+   +- Expand [[session_window#x, a#x, ts#x, a#x, session_window#x, 0], 
[session_window#x, a#x, ts#x, a#x, null, 1], [session_window#x, a#x, ts#x, 
null, session_window#x, 2], [session_window#x, a#x, ts#x, null, null, 3]], 
[session_window#x, a#x, ts#x, a#x, session_window#x, spark_grouping_id#xL]
+      +- Project [session_window#x, a#x, ts#x, a#x AS a#x, session_window#x AS 
session_window#x]
+         +- Filter isnotnull(ts#x)
+            +- Project [named_struct(start, 
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType, 
LongType), LongType, TimestampType), end, 
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x + 
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType, 
TimestampType))) AS session_window#x, a#x, ts#x]
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, session_window(ts, '5 minutes')), (a))
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, session_window#x, spark_grouping_id#xL], [a#x, 
session_window#x.start AS start#x, count(1) AS cnt#xL]
+   +- Expand [[session_window#x, a#x, ts#x, a#x, session_window#x, 0], 
[session_window#x, a#x, ts#x, a#x, null, 1]], [session_window#x, a#x, ts#x, 
a#x, session_window#x, spark_grouping_id#xL]
+      +- Project [session_window#x, a#x, ts#x, a#x AS a#x, session_window#x AS 
session_window#x]
+         +- Filter isnotnull(ts#x)
+            +- Project [named_struct(start, 
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType, 
LongType), LongType, TimestampType), end, 
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x + 
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType, 
TimestampType))) AS session_window#x, a#x, ts#x]
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(session_window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt
+-- !query analysis
+Sort [cnt#xL ASC NULLS FIRST], true
++- Aggregate [concat(cast(session_window#x.start as string), a#x)], [count(1) 
AS cnt#xL]
+   +- Filter isnotnull(ts#x)
+      +- Project [named_struct(start, 
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType, 
LongType), LongType, TimestampType), end, 
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x + 
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType, 
TimestampType))) AS session_window#x, a#x, ts#x]
+         +- SubqueryAlias ts_data
+            +- View (`ts_data`, [a#x, ts#x])
+               +- Project [cast(a#x as string) AS a#x, cast(ts#x as timestamp) 
AS ts#x]
+                  +- Project [a#x, ts#x]
+                     +- SubqueryAlias tab
+                        +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY session_window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY session_window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Filter (cnt#xL > cast(1 as bigint))
+   +- Aggregate [session_window#x], [session_window#x.start AS start#x, 
count(1) AS cnt#xL]
+      +- Filter isnotnull(ts#x)
+         +- Project [named_struct(start, 
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType, 
LongType), LongType, TimestampType), end, 
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x + 
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType, 
TimestampType))) AS session_window#x, a#x, ts#x]
+            +- SubqueryAlias ts_data
+               +- View (`ts_data`, [a#x, ts#x])
+                  +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                     +- Project [a#x, ts#x]
+                        +- SubqueryAlias tab
+                           +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT * FROM (
+  SELECT
+    a,
+    session_window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, session_window(ts, '5 minutes')
+) sub
+ORDER BY a, ws
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, ws#x ASC NULLS FIRST], true
++- Project [a#x, ws#x, cnt#xL]
+   +- SubqueryAlias sub
+      +- Aggregate [a#x, session_window#x], [a#x, session_window#x.start AS 
ws#x, count(1) AS cnt#xL]
+         +- Filter isnotnull(ts#x)
+            +- Project [named_struct(start, 
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType, 
LongType), LongType, TimestampType), end, 
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x + 
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType, 
TimestampType))) AS session_window#x, a#x, ts#x]
+               +- SubqueryAlias ts_data
+                  +- View (`ts_data`, [a#x, ts#x])
+                     +- Project [cast(a#x as string) AS a#x, cast(ts#x as 
timestamp) AS ts#x]
+                        +- Project [a#x, ts#x]
+                           +- SubqueryAlias tab
+                              +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+  session_window.start,
+  session_window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY session_window(ts, '5 minutes')
+ORDER BY start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [session_window#x], [session_window#x.start AS start#x, 
session_window#x.end AS end#x, count(1) AS cnt#xL]
+   +- Filter isnotnull(ts#x)
+      +- Project [named_struct(start, 
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType, 
LongType), LongType, TimestampType), end, 
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x + 
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType, 
TimestampType))) AS session_window#x, ts#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [ts#x]
diff --git a/sql/core/src/test/resources/sql-tests/inputs/time-window.sql 
b/sql/core/src/test/resources/sql-tests/inputs/time-window.sql
new file mode 100644
index 000000000000..31771ff8d934
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/time-window.sql
@@ -0,0 +1,231 @@
+-- Setup: temp view used across cases
+CREATE OR REPLACE TEMPORARY VIEW ts_data AS
+SELECT * FROM VALUES
+  ('A1', CAST('2021-01-01 00:00:00' AS TIMESTAMP)),
+  ('A1', CAST('2021-01-01 00:04:30' AS TIMESTAMP)),
+  ('A1', CAST('2021-01-01 00:06:00' AS TIMESTAMP)),
+  ('A2', CAST('2021-01-01 00:01:00' AS TIMESTAMP))
+AS tab(a, ts);
+
+-- window_with_rollup
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start;
+
+-- window_with_cube
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start;
+
+-- window_with_grouping_sets
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes')), (a))
+ORDER BY a, start;
+
+-- window_nested_in_expression
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt;
+
+-- window_having
+SELECT
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY window.start;
+
+-- window_in_subquery
+SELECT * FROM (
+  SELECT
+    a,
+    window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, window(ts, '5 minutes')
+) sub
+ORDER BY a, ws;
+
+-- record_at_window_start
+SELECT
+  window.start,
+  window.end,
+  count(*) AS counts
+FROM VALUES
+  (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds')
+ORDER BY window.start;
+
+-- tumbling_window_pre_epoch
+SELECT
+  window.start,
+  window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes')
+ORDER BY start;
+
+-- sliding_window_with_rollup
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH ROLLUP
+ORDER BY a, start;
+
+-- sliding_window_with_cube
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH CUBE
+ORDER BY a, start;
+
+-- sliding_window_with_grouping_sets
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes', '1 minute')), (a))
+ORDER BY a, start;
+
+-- sliding_window_nested_in_expression
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes', '1 minute').start AS string), a)
+ORDER BY cnt;
+
+-- sliding_window_having
+SELECT
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes', '1 minute')
+HAVING count(*) > 1
+ORDER BY window.start;
+
+-- sliding_window_in_subquery
+SELECT * FROM (
+  SELECT
+    a,
+    window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, window(ts, '5 minutes', '1 minute')
+) sub
+ORDER BY a, ws;
+
+-- record_at_sliding_window_start
+SELECT
+  window.start,
+  window.end,
+  count(*) AS counts
+FROM VALUES
+  (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds', '5 seconds')
+ORDER BY window.start;
+
+-- sliding_window_pre_epoch
+SELECT
+  window.start,
+  window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes', '1 minute')
+ORDER BY start;
+
+-- session_window_with_rollup
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start;
+
+-- session_window_with_cube
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start;
+
+-- session_window_with_grouping_sets
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, session_window(ts, '5 minutes')), (a))
+ORDER BY a, start;
+
+-- session_window_nested_in_expression
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(session_window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt;
+
+-- session_window_having
+SELECT
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY session_window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY session_window.start;
+
+-- session_window_in_subquery
+SELECT * FROM (
+  SELECT
+    a,
+    session_window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, session_window(ts, '5 minutes')
+) sub
+ORDER BY a, ws;
+
+-- session_window_pre_epoch
+SELECT
+  session_window.start,
+  session_window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY session_window(ts, '5 minutes')
+ORDER BY start;
diff --git a/sql/core/src/test/resources/sql-tests/results/time-window.sql.out 
b/sql/core/src/test/resources/sql-tests/results/time-window.sql.out
new file mode 100644
index 000000000000..0e3bca21d4a0
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/time-window.sql.out
@@ -0,0 +1,508 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW ts_data AS
+SELECT * FROM VALUES
+  ('A1', CAST('2021-01-01 00:00:00' AS TIMESTAMP)),
+  ('A1', CAST('2021-01-01 00:04:30' AS TIMESTAMP)),
+  ('A1', CAST('2021-01-01 00:06:00' AS TIMESTAMP)),
+  ('A2', CAST('2021-01-01 00:01:00' AS TIMESTAMP))
+AS tab(a, ts)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+NULL   NULL    4
+A1     NULL    3
+A1     2021-01-01 00:00:00     2
+A1     2021-01-01 00:05:00     1
+A2     NULL    1
+A2     2021-01-01 00:00:00     1
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+NULL   NULL    4
+NULL   2021-01-01 00:00:00     3
+NULL   2021-01-01 00:05:00     1
+A1     NULL    3
+A1     2021-01-01 00:00:00     2
+A1     2021-01-01 00:05:00     1
+A2     NULL    1
+A2     2021-01-01 00:00:00     1
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes')), (a))
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+A1     NULL    3
+A1     2021-01-01 00:00:00     2
+A1     2021-01-01 00:05:00     1
+A2     NULL    1
+A2     2021-01-01 00:00:00     1
+
+
+-- !query
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt
+-- !query schema
+struct<cnt:bigint>
+-- !query output
+1
+1
+2
+
+
+-- !query
+SELECT
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY window.start
+-- !query schema
+struct<start:timestamp,cnt:bigint>
+-- !query output
+2021-01-01 00:00:00    3
+
+
+-- !query
+SELECT * FROM (
+  SELECT
+    a,
+    window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, window(ts, '5 minutes')
+) sub
+ORDER BY a, ws
+-- !query schema
+struct<a:string,ws:timestamp,cnt:bigint>
+-- !query output
+A1     2021-01-01 00:00:00     2
+A1     2021-01-01 00:05:00     1
+A2     2021-01-01 00:00:00     1
+
+
+-- !query
+SELECT
+  window.start,
+  window.end,
+  count(*) AS counts
+FROM VALUES
+  (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds')
+ORDER BY window.start
+-- !query schema
+struct<start:timestamp,end:timestamp,counts:bigint>
+-- !query output
+2016-03-27 19:39:30    2016-03-27 19:39:40     1
+
+
+-- !query
+SELECT
+  window.start,
+  window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes')
+ORDER BY start
+-- !query schema
+struct<start:timestamp,end:timestamp,cnt:bigint>
+-- !query output
+1969-12-31 23:55:00    1970-01-01 00:00:00     2
+1970-01-01 00:00:00    1970-01-01 00:05:00     1
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH ROLLUP
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+NULL   NULL    20
+A1     NULL    15
+A1     2020-12-31 23:56:00     1
+A1     2020-12-31 23:57:00     1
+A1     2020-12-31 23:58:00     1
+A1     2020-12-31 23:59:00     1
+A1     2021-01-01 00:00:00     2
+A1     2021-01-01 00:01:00     1
+A1     2021-01-01 00:02:00     2
+A1     2021-01-01 00:03:00     2
+A1     2021-01-01 00:04:00     2
+A1     2021-01-01 00:05:00     1
+A1     2021-01-01 00:06:00     1
+A2     NULL    5
+A2     2020-12-31 23:57:00     1
+A2     2020-12-31 23:58:00     1
+A2     2020-12-31 23:59:00     1
+A2     2021-01-01 00:00:00     1
+A2     2021-01-01 00:01:00     1
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH CUBE
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+NULL   NULL    20
+NULL   2020-12-31 23:56:00     1
+NULL   2020-12-31 23:57:00     2
+NULL   2020-12-31 23:58:00     2
+NULL   2020-12-31 23:59:00     2
+NULL   2021-01-01 00:00:00     3
+NULL   2021-01-01 00:01:00     2
+NULL   2021-01-01 00:02:00     2
+NULL   2021-01-01 00:03:00     2
+NULL   2021-01-01 00:04:00     2
+NULL   2021-01-01 00:05:00     1
+NULL   2021-01-01 00:06:00     1
+A1     NULL    15
+A1     2020-12-31 23:56:00     1
+A1     2020-12-31 23:57:00     1
+A1     2020-12-31 23:58:00     1
+A1     2020-12-31 23:59:00     1
+A1     2021-01-01 00:00:00     2
+A1     2021-01-01 00:01:00     1
+A1     2021-01-01 00:02:00     2
+A1     2021-01-01 00:03:00     2
+A1     2021-01-01 00:04:00     2
+A1     2021-01-01 00:05:00     1
+A1     2021-01-01 00:06:00     1
+A2     NULL    5
+A2     2020-12-31 23:57:00     1
+A2     2020-12-31 23:58:00     1
+A2     2020-12-31 23:59:00     1
+A2     2021-01-01 00:00:00     1
+A2     2021-01-01 00:01:00     1
+
+
+-- !query
+SELECT
+  a,
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes', '1 minute')), (a))
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+A1     NULL    15
+A1     2020-12-31 23:56:00     1
+A1     2020-12-31 23:57:00     1
+A1     2020-12-31 23:58:00     1
+A1     2020-12-31 23:59:00     1
+A1     2021-01-01 00:00:00     2
+A1     2021-01-01 00:01:00     1
+A1     2021-01-01 00:02:00     2
+A1     2021-01-01 00:03:00     2
+A1     2021-01-01 00:04:00     2
+A1     2021-01-01 00:05:00     1
+A1     2021-01-01 00:06:00     1
+A2     NULL    5
+A2     2020-12-31 23:57:00     1
+A2     2020-12-31 23:58:00     1
+A2     2020-12-31 23:59:00     1
+A2     2021-01-01 00:00:00     1
+A2     2021-01-01 00:01:00     1
+
+
+-- !query
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes', '1 minute').start AS string), a)
+ORDER BY cnt
+-- !query schema
+struct<cnt:bigint>
+-- !query output
+1
+1
+1
+1
+1
+1
+1
+1
+1
+1
+1
+1
+2
+2
+2
+2
+
+
+-- !query
+SELECT
+  window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes', '1 minute')
+HAVING count(*) > 1
+ORDER BY window.start
+-- !query schema
+struct<start:timestamp,cnt:bigint>
+-- !query output
+2020-12-31 23:57:00    2
+2020-12-31 23:58:00    2
+2020-12-31 23:59:00    2
+2021-01-01 00:00:00    3
+2021-01-01 00:01:00    2
+2021-01-01 00:02:00    2
+2021-01-01 00:03:00    2
+2021-01-01 00:04:00    2
+
+
+-- !query
+SELECT * FROM (
+  SELECT
+    a,
+    window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, window(ts, '5 minutes', '1 minute')
+) sub
+ORDER BY a, ws
+-- !query schema
+struct<a:string,ws:timestamp,cnt:bigint>
+-- !query output
+A1     2020-12-31 23:56:00     1
+A1     2020-12-31 23:57:00     1
+A1     2020-12-31 23:58:00     1
+A1     2020-12-31 23:59:00     1
+A1     2021-01-01 00:00:00     2
+A1     2021-01-01 00:01:00     1
+A1     2021-01-01 00:02:00     2
+A1     2021-01-01 00:03:00     2
+A1     2021-01-01 00:04:00     2
+A1     2021-01-01 00:05:00     1
+A1     2021-01-01 00:06:00     1
+A2     2020-12-31 23:57:00     1
+A2     2020-12-31 23:58:00     1
+A2     2020-12-31 23:59:00     1
+A2     2021-01-01 00:00:00     1
+A2     2021-01-01 00:01:00     1
+
+
+-- !query
+SELECT
+  window.start,
+  window.end,
+  count(*) AS counts
+FROM VALUES
+  (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds', '5 seconds')
+ORDER BY window.start
+-- !query schema
+struct<start:timestamp,end:timestamp,counts:bigint>
+-- !query output
+2016-03-27 19:39:25    2016-03-27 19:39:35     1
+2016-03-27 19:39:30    2016-03-27 19:39:40     1
+
+
+-- !query
+SELECT
+  window.start,
+  window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes', '1 minute')
+ORDER BY start
+-- !query schema
+struct<start:timestamp,end:timestamp,cnt:bigint>
+-- !query output
+1969-12-31 23:51:00    1969-12-31 23:56:00     1
+1969-12-31 23:52:00    1969-12-31 23:57:00     1
+1969-12-31 23:53:00    1969-12-31 23:58:00     1
+1969-12-31 23:54:00    1969-12-31 23:59:00     1
+1969-12-31 23:55:00    1970-01-01 00:00:00     2
+1969-12-31 23:56:00    1970-01-01 00:01:00     2
+1969-12-31 23:57:00    1970-01-01 00:02:00     2
+1969-12-31 23:58:00    1970-01-01 00:03:00     2
+1969-12-31 23:59:00    1970-01-01 00:04:00     2
+1970-01-01 00:00:00    1970-01-01 00:05:00     1
+
+
+-- !query
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkException
+{
+  "errorClass" : "INTERNAL_ERROR",
+  "sqlState" : "XX000",
+  "messageParameters" : {
+    "message" : "Grouping Session should not be null."
+  }
+}
+
+
+-- !query
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkException
+{
+  "errorClass" : "INTERNAL_ERROR",
+  "sqlState" : "XX000",
+  "messageParameters" : {
+    "message" : "Grouping Session should not be null."
+  }
+}
+
+
+-- !query
+SELECT
+  a,
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, session_window(ts, '5 minutes')), (a))
+ORDER BY a, start
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkException
+{
+  "errorClass" : "INTERNAL_ERROR",
+  "sqlState" : "XX000",
+  "messageParameters" : {
+    "message" : "Grouping Session should not be null."
+  }
+}
+
+
+-- !query
+SELECT
+  count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(session_window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt
+-- !query schema
+struct<cnt:bigint>
+-- !query output
+1
+1
+1
+1
+
+
+-- !query
+SELECT
+  session_window.start,
+  count(*) AS cnt
+FROM ts_data
+GROUP BY session_window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY session_window.start
+-- !query schema
+struct<start:timestamp,cnt:bigint>
+-- !query output
+2021-01-01 00:00:00    4
+
+
+-- !query
+SELECT * FROM (
+  SELECT
+    a,
+    session_window.start AS ws,
+    count(*) AS cnt
+  FROM ts_data
+  GROUP BY a, session_window(ts, '5 minutes')
+) sub
+ORDER BY a, ws
+-- !query schema
+struct<a:string,ws:timestamp,cnt:bigint>
+-- !query output
+A1     2021-01-01 00:00:00     3
+A2     2021-01-01 00:01:00     1
+
+
+-- !query
+SELECT
+  session_window.start,
+  session_window.end,
+  count(*) AS cnt
+FROM VALUES
+  (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+  (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+  (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY session_window(ts, '5 minutes')
+ORDER BY start
+-- !query schema
+struct<start:timestamp,end:timestamp,cnt:bigint>
+-- !query output
+1969-12-31 23:55:00    1970-01-01 00:05:00     3


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to