This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 56496815b218 [SPARK-56875][SQL] Refactor time/session window
resolution code to be reused in the single-pass Analyzer
56496815b218 is described below
commit 56496815b2189a76c8a92e1714538c5f3c751c40
Author: Vladimir Golubev <[email protected]>
AuthorDate: Fri May 15 20:55:35 2026 +0800
[SPARK-56875][SQL] Refactor time/session window resolution code to be
reused in the single-pass Analyzer
### What changes were proposed in this pull request?
Refactor time/session window resolution code to be reused in the
single-pass Analyzer.
### Why are the changes needed?
Unblocks single-pass Analyzer support for time/sessnon windowing.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing/new tests.
### Was this patch authored or co-authored using generative AI tooling?
Claude.
Closes #55895 from
vladimirg-db/vladimir-golubev_data/time-windowing-refactor.
Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 7569ba6265597cee5a972fec65a8e68368512844)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/ResolveTimeWindows.scala | 219 +--------
.../catalyst/analysis/TimeWindowResolution.scala | 253 ++++++++++
.../sql-tests/analyzer-results/time-window.sql.out | 525 +++++++++++++++++++++
.../resources/sql-tests/inputs/time-window.sql | 231 +++++++++
.../sql-tests/results/time-window.sql.out | 508 ++++++++++++++++++++
5 files changed, 1537 insertions(+), 199 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index a8680d0a0181..0061ddc0dc1e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -17,15 +17,12 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.ExtendedAnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression,
GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion,
SessionWindow, Subtract, TimeWindow, WindowTime}
-import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.expressions.{SessionWindow, TimeWindow,
WindowTime}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW,
TIME_WINDOW, WINDOW_TIME}
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType,
Metadata, MetadataBuilder, StructType}
-import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.sql.types.StructType
/**
* Maps a time column to multiple time windows using the Expand operator.
Since it's non-trivial to
@@ -33,11 +30,6 @@ import org.apache.spark.unsafe.types.CalendarInterval
* filter out the rows where the time column is not inside the time window.
*/
object TimeWindowing extends Rule[LogicalPlan] {
- import org.apache.spark.sql.catalyst.dsl.expressions._
-
- private final val WINDOW_COL_NAME = "window"
- private final val WINDOW_START = "start"
- private final val WINDOW_END = "end"
/**
* Generates the logical plan for generating window ranges on a timestamp
column. Without
@@ -93,80 +85,13 @@ object TimeWindowing extends Rule[LogicalPlan] {
case t: TimeWindow => t.copy(timeColumn =
WindowTime(window.timeColumn))
}
} else {
- val metadata = window.timeColumn match {
- case a: Attribute => a.metadata
- case _ => Metadata.empty
- }
-
- val newMetadata = new MetadataBuilder()
- .withMetadata(metadata)
- .putBoolean(TimeWindow.marker, true)
- .build()
-
- def getWindow(i: Int, dataType: DataType): Expression = {
- val timestamp = PreciseTimestampConversion(window.timeColumn,
dataType, LongType)
- val remainder = (timestamp - window.startTime) %
window.slideDuration
- val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0),
- remainder + window.slideDuration)), Some(remainder))
- val windowStart = lastStart - i * window.slideDuration
- val windowEnd = windowStart + window.windowDuration
-
- // We make sure value fields are nullable since the dataType of
TimeWindow defines them
- // as nullable.
- CreateNamedStruct(
- Literal(WINDOW_START) ::
- PreciseTimestampConversion(windowStart, LongType,
dataType).castNullable() ::
- Literal(WINDOW_END) ::
- PreciseTimestampConversion(windowEnd, LongType,
dataType).castNullable() ::
- Nil)
- }
-
- val windowAttr = AttributeReference(
- WINDOW_COL_NAME, window.dataType, metadata = newMetadata)()
-
- if (window.windowDuration == window.slideDuration) {
- val windowStruct = Alias(getWindow(0, window.timeColumn.dataType),
WINDOW_COL_NAME)(
- exprId = windowAttr.exprId, explicitMetadata = Some(newMetadata))
-
- val replacedPlan = p transformExpressions {
- case t: TimeWindow => windowAttr
- }
-
- // For backwards compatibility we add a filter to filter out nulls
- val filterExpr = IsNotNull(window.timeColumn)
+ val (windowAttr, newChild) =
+ TimeWindowResolution.buildTimeWindowRewrite(window, child)
- replacedPlan.withNewChildren(
- Project(windowStruct +: child.output,
- Filter(filterExpr, child)) :: Nil)
- } else {
- val overlappingWindows =
- math.ceil(window.windowDuration * 1.0 /
window.slideDuration).toInt
- val windows =
- Seq.tabulate(overlappingWindows)(i =>
- getWindow(i, window.timeColumn.dataType))
-
- val projections = windows.map(_ +: child.output)
-
- // When the condition windowDuration % slideDuration = 0 is
fulfilled,
- // the estimation of the number of windows becomes exact one,
- // which means all produced windows are valid.
- val filterExpr =
- if (window.windowDuration % window.slideDuration == 0) {
- IsNotNull(window.timeColumn)
- } else {
- window.timeColumn >= windowAttr.getField(WINDOW_START) &&
- window.timeColumn < windowAttr.getField(WINDOW_END)
- }
-
- val substitutedPlan = Filter(filterExpr,
- Expand(projections, windowAttr +: child.output, child))
-
- val renamedPlan = p transformExpressions {
- case t: TimeWindow => windowAttr
- }
-
- renamedPlan.withNewChildren(substitutedPlan :: Nil)
+ val replacedPlan = p.transformExpressions {
+ case _: TimeWindow => windowAttr
}
+ replacedPlan.withNewChildren(newChild :: Nil)
}
} else if (numWindowExpr > 1) {
throw
QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
@@ -178,11 +103,6 @@ object TimeWindowing extends Rule[LogicalPlan] {
/** Maps a time column to a session window. */
object SessionWindowing extends Rule[LogicalPlan] {
- import org.apache.spark.sql.catalyst.dsl.expressions._
-
- private final val SESSION_COL_NAME = "session_window"
- private final val SESSION_START = "start"
- private final val SESSION_END = "end"
/**
* Generates the logical plan for generating session window on a timestamp
column.
@@ -211,73 +131,17 @@ object SessionWindowing extends Rule[LogicalPlan] {
val session = sessionExpressions.head
if (StructType.acceptsType(session.timeColumn.dataType)) {
- p transformExpressions {
+ p.transformExpressions {
case t: SessionWindow => t.copy(timeColumn =
WindowTime(session.timeColumn))
}
} else {
- val metadata = session.timeColumn match {
- case a: Attribute => a.metadata
- case _ => Metadata.empty
- }
-
- val newMetadata = new MetadataBuilder()
- .withMetadata(metadata)
- .putBoolean(SessionWindow.marker, true)
- .build()
-
- val sessionAttr = AttributeReference(
- SESSION_COL_NAME, session.dataType, metadata = newMetadata)()
+ val (sessionAttr, newChild) =
+ TimeWindowResolution.buildSessionWindowRewrite(session, child)
- val sessionStart =
- PreciseTimestampConversion(session.timeColumn,
session.timeColumn.dataType, LongType)
- val gapDuration = session.gapDuration match {
- case expr if expr.dataType == CalendarIntervalType =>
- expr
- case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
- Cast(expr, CalendarIntervalType)
- case other =>
- throw
QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
+ val replacedPlan = p.transformExpressions {
+ case _: SessionWindow => sessionAttr
}
- val sessionEnd = PreciseTimestampConversion(session.timeColumn +
gapDuration,
- session.timeColumn.dataType, LongType)
-
- // We make sure value fields are nullable since the dataType of
SessionWindow defines them
- // as nullable.
- val literalSessionStruct = CreateNamedStruct(
- Literal(SESSION_START) ::
- PreciseTimestampConversion(sessionStart, LongType,
session.timeColumn.dataType)
- .castNullable() ::
- Literal(SESSION_END) ::
- PreciseTimestampConversion(sessionEnd, LongType,
session.timeColumn.dataType)
- .castNullable() ::
- Nil)
-
- val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
- exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
-
- val replacedPlan = p transformExpressions {
- case s: SessionWindow => sessionAttr
- }
-
- val filterByTimeRange = if (gapDuration.foldable) {
- val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
- interval == null || interval.months + interval.days +
interval.microseconds <= 0
- } else {
- true
- }
-
- // As same as tumbling window, we add a filter to filter out nulls.
- // And we also filter out events with negative or zero or invalid
gap duration.
- val filterExpr = if (filterByTimeRange) {
- IsNotNull(session.timeColumn) &&
- (sessionAttr.getField(SESSION_END) >
sessionAttr.getField(SESSION_START))
- } else {
- IsNotNull(session.timeColumn)
- }
-
- replacedPlan.withNewChildren(
- Filter(filterExpr,
- Project(sessionStruct +: child.output, child)) :: Nil)
+ replacedPlan.withNewChildren(newChild :: Nil)
}
} else if (numWindowExpr > 1) {
throw
QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
@@ -292,7 +156,7 @@ object SessionWindowing extends Rule[LogicalPlan] {
* window column generated as the output of the window aggregating operators.
The
* window column is of type struct { start: TimestampType, end: TimestampType
}.
* The correct representative event time of a window is ``window.end - 1``.
- * */
+ */
object ResolveWindowTime extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
_.containsPattern(WINDOW_TIME), ruleId) {
@@ -306,56 +170,13 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
}
if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) {
- val windowTimeToAttrAndNewColumn = windowTimeExpressions.map {
windowTime =>
- val metadata = windowTime.windowColumn match {
- case a: Attribute => a.metadata
- case _ => Metadata.empty
- }
-
- if (!metadata.contains(TimeWindow.marker) &&
- !metadata.contains(SessionWindow.marker)) {
- throw new ExtendedAnalysisException(
- new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_3101",
- messageParameters = Map("windowTime" -> windowTime.toString)),
- plan = p)
- }
-
- val newMetadata = new MetadataBuilder()
- .withMetadata(metadata)
- .remove(TimeWindow.marker)
- .remove(SessionWindow.marker)
- .build()
-
- val colName = windowTime.sql
+ val (windowTimeToAttr, newChild) =
+ TimeWindowResolution.buildWindowTimeRewrite(windowTimeExpressions,
child, p)
- val attr = AttributeReference(colName, windowTime.dataType, metadata
= newMetadata)()
-
- // NOTE: "window.end" is "exclusive" upper bound of window, so if we
use this value as
- // it is, it is going to be bound to the different window even if we
apply the same window
- // spec. Decrease 1 microsecond from window.end to let the
window_time be bound to the
- // correct window range.
- val subtractExpr =
- PreciseTimestampConversion(
- Subtract(PreciseTimestampConversion(
- GetStructField(windowTime.windowColumn, 1),
- windowTime.dataType, LongType), Literal(1L)),
- LongType,
- windowTime.dataType)
-
- val newColumn = Alias(subtractExpr, colName)(
- exprId = attr.exprId, explicitMetadata = Some(newMetadata))
-
- windowTime -> (attr, newColumn)
- }.toMap
-
- val replacedPlan = p transformExpressions {
- case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1
+ val replacedPlan = p.transformExpressions {
+ case w: WindowTime => windowTimeToAttr(w)
}
-
- val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2)
- replacedPlan.withNewChildren(
- Project(newColumnsToAdd ++: child.output, child) :: Nil)
+ replacedPlan.withNewChildren(newChild :: Nil)
} else {
p // Return unchanged. Analyzer will throw exception later
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
new file mode 100644
index 000000000000..8dbe6ed44d1c
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.ExtendedAnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression,
GetStructField, IsNotNull, LessThan, Literal, NamedExpression,
PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
+import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter,
LogicalPlan, Project}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType,
Metadata, MetadataBuilder}
+import org.apache.spark.unsafe.types.CalendarInterval
+
+/**
+ * Plan-rewrite helpers for resolving [[TimeWindow]], [[SessionWindow]] and
[[WindowTime]]
+ * expressions. The bodies live here so that the synthesis logic can be reused
outside of a
+ * [[org.apache.spark.sql.catalyst.rules.Rule]] traversal.
+ */
+object TimeWindowResolution {
+
+ final val WINDOW_COL_NAME = "window"
+ final val SESSION_COL_NAME = "session_window"
+
+ private final val WINDOW_START = "start"
+ private final val WINDOW_END = "end"
+ private final val SESSION_START = "start"
+ private final val SESSION_END = "end"
+
+ /**
+ * Synthesizes the [[Project]]/[[Expand]]+[[Filter]] sub-plan for a resolved
[[TimeWindow]] and
+ * returns the window [[AttributeReference]] together with the new sub-plan
that wraps `child`.
+ */
+ def buildTimeWindowRewrite(
+ window: TimeWindow,
+ child: LogicalPlan): (AttributeReference, LogicalPlan) = {
+ import org.apache.spark.sql.catalyst.dsl.expressions._
+
+ val metadata = window.timeColumn match {
+ case a: Attribute => a.metadata
+ case _ => Metadata.empty
+ }
+
+ val newMetadata = new MetadataBuilder()
+ .withMetadata(metadata)
+ .putBoolean(TimeWindow.marker, true)
+ .build()
+
+ def getWindow(i: Int, dataType: DataType): Expression = {
+ val timestamp = PreciseTimestampConversion(window.timeColumn, dataType,
LongType)
+ val remainder = (timestamp - window.startTime) % window.slideDuration
+ val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0),
+ remainder + window.slideDuration)), Some(remainder))
+ val windowStart = lastStart - i * window.slideDuration
+ val windowEnd = windowStart + window.windowDuration
+
+ // We make sure value fields are nullable since the dataType of
TimeWindow defines them
+ // as nullable.
+ CreateNamedStruct(
+ Literal(WINDOW_START) ::
+ PreciseTimestampConversion(windowStart, LongType,
dataType).castNullable() ::
+ Literal(WINDOW_END) ::
+ PreciseTimestampConversion(windowEnd, LongType,
dataType).castNullable() ::
+ Nil)
+ }
+
+ val windowAttr = AttributeReference(
+ WINDOW_COL_NAME, window.dataType, metadata = newMetadata)()
+
+ val newChild = if (window.windowDuration == window.slideDuration) {
+ val windowStruct = Alias(getWindow(0, window.timeColumn.dataType),
WINDOW_COL_NAME)(
+ exprId = windowAttr.exprId, explicitMetadata = Some(newMetadata))
+
+ // For backwards compatibility we add a filter to filter out nulls
+ val filterExpr = IsNotNull(window.timeColumn)
+
+ Project(windowStruct +: child.output,
+ Filter(filterExpr, child))
+ } else {
+ val overlappingWindows =
+ math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
+ val windows =
+ Seq.tabulate(overlappingWindows)(i =>
+ getWindow(i, window.timeColumn.dataType))
+
+ val projections = windows.map(_ +: child.output)
+
+ // When the condition windowDuration % slideDuration = 0 is fulfilled,
+ // the estimation of the number of windows becomes exact one,
+ // which means all produced windows are valid.
+ val filterExpr =
+ if (window.windowDuration % window.slideDuration == 0) {
+ IsNotNull(window.timeColumn)
+ } else {
+ window.timeColumn >= windowAttr.getField(WINDOW_START) &&
+ window.timeColumn < windowAttr.getField(WINDOW_END)
+ }
+
+ Filter(filterExpr,
+ Expand(projections, windowAttr +: child.output, child))
+ }
+
+ (windowAttr, newChild)
+ }
+
+ /**
+ * Synthesizes the [[Project]]+[[Filter]] sub-plan for a resolved
[[SessionWindow]] and returns
+ * the session window [[AttributeReference]] together with the new sub-plan
that wraps `child`.
+ */
+ def buildSessionWindowRewrite(
+ session: SessionWindow,
+ child: LogicalPlan): (AttributeReference, LogicalPlan) = {
+ import org.apache.spark.sql.catalyst.dsl.expressions._
+
+ val metadata = session.timeColumn match {
+ case a: Attribute => a.metadata
+ case _ => Metadata.empty
+ }
+
+ val newMetadata = new MetadataBuilder()
+ .withMetadata(metadata)
+ .putBoolean(SessionWindow.marker, true)
+ .build()
+
+ val sessionAttr = AttributeReference(
+ SESSION_COL_NAME, session.dataType, metadata = newMetadata)()
+
+ val sessionStart =
+ PreciseTimestampConversion(session.timeColumn,
session.timeColumn.dataType, LongType)
+ val gapDuration = session.gapDuration match {
+ case expr if expr.dataType == CalendarIntervalType =>
+ expr
+ case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
+ Cast(expr, CalendarIntervalType)
+ case other =>
+ throw
QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
+ }
+ val sessionEnd = PreciseTimestampConversion(session.timeColumn +
gapDuration,
+ session.timeColumn.dataType, LongType)
+
+ // We make sure value fields are nullable since the dataType of
SessionWindow defines them
+ // as nullable.
+ val literalSessionStruct = CreateNamedStruct(
+ Literal(SESSION_START) ::
+ PreciseTimestampConversion(sessionStart, LongType,
session.timeColumn.dataType)
+ .castNullable() ::
+ Literal(SESSION_END) ::
+ PreciseTimestampConversion(sessionEnd, LongType,
session.timeColumn.dataType)
+ .castNullable() ::
+ Nil)
+
+ val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
+ exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
+
+ val filterByTimeRange = if (gapDuration.foldable) {
+ val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
+ interval == null || interval.months + interval.days +
interval.microseconds <= 0
+ } else {
+ true
+ }
+
+ // As same as tumbling window, we add a filter to filter out nulls.
+ // And we also filter out events with negative or zero or invalid gap
duration.
+ val filterExpr = if (filterByTimeRange) {
+ IsNotNull(session.timeColumn) &&
+ (sessionAttr.getField(SESSION_END) >
sessionAttr.getField(SESSION_START))
+ } else {
+ IsNotNull(session.timeColumn)
+ }
+
+ val newChild = Filter(filterExpr,
+ Project(sessionStruct +: child.output, child))
+
+ (sessionAttr, newChild)
+ }
+
+ /**
+ * Synthesizes a [[Project]] sub-plan that exposes one extracted-time column
per resolved
+ * [[WindowTime]] in `windowTimeExpressions`, wrapping `child`. Returns the
per-[[WindowTime]]
+ * [[AttributeReference]] map and the new sub-plan. `errorContext` is the
plan attached to the
+ * [[ExtendedAnalysisException]] thrown when a window-column metadata marker
is missing.
+ */
+ def buildWindowTimeRewrite(
+ windowTimeExpressions: Set[WindowTime],
+ child: LogicalPlan,
+ errorContext: LogicalPlan): (Map[WindowTime, AttributeReference],
LogicalPlan) = {
+ val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime
=>
+ val metadata = windowTime.windowColumn match {
+ case a: Attribute => a.metadata
+ case _ => Metadata.empty
+ }
+
+ if (!metadata.contains(TimeWindow.marker) &&
+ !metadata.contains(SessionWindow.marker)) {
+ throw new ExtendedAnalysisException(
+ new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_3101",
+ messageParameters = Map("windowTime" -> windowTime.toString)),
+ plan = errorContext)
+ }
+
+ val newMetadata = new MetadataBuilder()
+ .withMetadata(metadata)
+ .remove(TimeWindow.marker)
+ .remove(SessionWindow.marker)
+ .build()
+
+ val colName = windowTime.sql
+
+ val attr = AttributeReference(colName, windowTime.dataType, metadata =
newMetadata)()
+
+ // NOTE: "window.end" is "exclusive" upper bound of window, so if we use
this value as
+ // it is, it is going to be bound to the different window even if we
apply the same window
+ // spec. Decrease 1 microsecond from window.end to let the window_time
be bound to the
+ // correct window range.
+ val subtractExpr =
+ PreciseTimestampConversion(
+ Subtract(PreciseTimestampConversion(
+ GetStructField(windowTime.windowColumn, 1),
+ windowTime.dataType, LongType), Literal(1L)),
+ LongType,
+ windowTime.dataType)
+
+ val newColumn = Alias(subtractExpr, colName)(
+ exprId = attr.exprId, explicitMetadata = Some(newMetadata))
+
+ windowTime -> (attr, newColumn)
+ }.toMap
+
+ val newColumns: Seq[NamedExpression] =
+ windowTimeToAttrAndNewColumn.values.map(_._2).toSeq
+ val newChild = Project(newColumns ++ child.output, child)
+
+ val windowTimeToAttr = windowTimeToAttrAndNewColumn.map {
+ case (windowTime, (attr, _)) => windowTime -> attr
+ }
+
+ (windowTimeToAttr, newChild)
+ }
+}
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/time-window.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/time-window.sql.out
new file mode 100644
index 000000000000..a6b88d42e58e
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/time-window.sql.out
@@ -0,0 +1,525 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW ts_data AS
+SELECT * FROM VALUES
+ ('A1', CAST('2021-01-01 00:00:00' AS TIMESTAMP)),
+ ('A1', CAST('2021-01-01 00:04:30' AS TIMESTAMP)),
+ ('A1', CAST('2021-01-01 00:06:00' AS TIMESTAMP)),
+ ('A2', CAST('2021-01-01 00:01:00' AS TIMESTAMP))
+AS tab(a, ts)
+-- !query analysis
+CreateViewCommand `ts_data`, SELECT * FROM VALUES
+ ('A1', CAST('2021-01-01 00:00:00' AS TIMESTAMP)),
+ ('A1', CAST('2021-01-01 00:04:30' AS TIMESTAMP)),
+ ('A1', CAST('2021-01-01 00:06:00' AS TIMESTAMP)),
+ ('A2', CAST('2021-01-01 00:01:00' AS TIMESTAMP))
+AS tab(a, ts), false, true, LocalTempView, UNSUPPORTED, true
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS
start#x, count(1) AS cnt#xL]
+ +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x,
a#x, null, 1], [window#x, a#x, ts#x, null, null, 3]], [window#x, a#x, ts#x,
a#x, window#x, spark_grouping_id#xL]
+ +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+ +- Project [named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000)
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) -
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(p [...]
+ +- Filter isnotnull(ts#x)
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS
start#x, count(1) AS cnt#xL]
+ +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x,
a#x, null, 1], [window#x, a#x, ts#x, null, window#x, 2], [window#x, a#x, ts#x,
null, null, 3]], [window#x, a#x, ts#x, a#x, window#x, spark_grouping_id#xL]
+ +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+ +- Project [named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000)
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) -
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(p [...]
+ +- Filter isnotnull(ts#x)
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes')), (a))
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS
start#x, count(1) AS cnt#xL]
+ +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x,
a#x, null, 1]], [window#x, a#x, ts#x, a#x, window#x, spark_grouping_id#xL]
+ +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+ +- Project [named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000)
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) -
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(p [...]
+ +- Filter isnotnull(ts#x)
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt
+-- !query analysis
+Sort [cnt#xL ASC NULLS FIRST], true
++- Aggregate [concat(cast(window#x.start as string), a#x)], [count(1) AS
cnt#xL]
+ +- Project [named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000)
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) -
0) % 300000000) END) - 0), LongType, TimestampType)), end,
knownnullable(precise [...]
+ +- Filter isnotnull(ts#x)
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as timestamp)
AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Filter (cnt#xL > cast(1 as bigint))
+ +- Aggregate [window#x], [window#x.start AS start#x, count(1) AS cnt#xL]
+ +- Project [named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000)
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) -
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(prec
[...]
+ +- Filter isnotnull(ts#x)
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT * FROM (
+ SELECT
+ a,
+ window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, window(ts, '5 minutes')
+) sub
+ORDER BY a, ws
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, ws#x ASC NULLS FIRST], true
++- Project [a#x, ws#x, cnt#xL]
+ +- SubqueryAlias sub
+ +- Aggregate [a#x, window#x], [a#x, window#x.start AS ws#x, count(1) AS
cnt#xL]
+ +- Project [named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000)
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) -
0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(p [...]
+ +- Filter isnotnull(ts#x)
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ window.start,
+ window.end,
+ count(*) AS counts
+FROM VALUES
+ (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds')
+ORDER BY window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [window#x], [window#x.start AS start#x, window#x.end AS end#x,
count(1) AS counts#xL]
+ +- Project [named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(time#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(time#x,
TimestampType, LongType) - 0) % 10000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(time#x, TimestampType, LongType) - 0) % 10000000)
+ 10000000) ELSE ((precisetimestampconversion(time#x, TimestampType, LongType)
- 0) % 10000000) END) - 0), LongType, TimestampType)), end, knownnullable(pre
[...]
+ +- Filter isnotnull(time#x)
+ +- SubqueryAlias tab
+ +- LocalRelation [time#x, value#x, id#x]
+
+
+-- !query
+SELECT
+ window.start,
+ window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes')
+ORDER BY start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [window#x], [window#x.start AS start#x, window#x.end AS end#x,
count(1) AS cnt#xL]
+ +- Project [named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 300000000)
+ 300000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) -
0) % 300000000) END) - 0), LongType, TimestampType)), end,
knownnullable(precise [...]
+ +- Filter isnotnull(ts#x)
+ +- SubqueryAlias tab
+ +- LocalRelation [ts#x]
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH ROLLUP
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS
start#x, count(1) AS cnt#xL]
+ +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x,
a#x, null, 1], [window#x, a#x, ts#x, null, null, 3]], [window#x, a#x, ts#x,
a#x, window#x, spark_grouping_id#xL]
+ +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+ +- Filter isnotnull(ts#x)
+ +- Expand [[named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) +
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0)
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(pr [...]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH CUBE
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS
start#x, count(1) AS cnt#xL]
+ +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x,
a#x, null, 1], [window#x, a#x, ts#x, null, window#x, 2], [window#x, a#x, ts#x,
null, null, 3]], [window#x, a#x, ts#x, a#x, window#x, spark_grouping_id#xL]
+ +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+ +- Filter isnotnull(ts#x)
+ +- Expand [[named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) +
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0)
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(pr [...]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes', '1 minute')), (a))
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, window#x, spark_grouping_id#xL], [a#x, window#x.start AS
start#x, count(1) AS cnt#xL]
+ +- Expand [[window#x, a#x, ts#x, a#x, window#x, 0], [window#x, a#x, ts#x,
a#x, null, 1]], [window#x, a#x, ts#x, a#x, window#x, spark_grouping_id#xL]
+ +- Project [window#x, a#x, ts#x, a#x AS a#x, window#x AS window#x]
+ +- Filter isnotnull(ts#x)
+ +- Expand [[named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) +
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0)
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(pr [...]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes', '1 minute').start AS string), a)
+ORDER BY cnt
+-- !query analysis
+Sort [cnt#xL ASC NULLS FIRST], true
++- Aggregate [concat(cast(window#x.start as string), a#x)], [count(1) AS
cnt#xL]
+ +- Filter isnotnull(ts#x)
+ +- Expand [[named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) +
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0)
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(preciset
[...]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as timestamp)
AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes', '1 minute')
+HAVING count(*) > 1
+ORDER BY window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Filter (cnt#xL > cast(1 as bigint))
+ +- Aggregate [window#x], [window#x.start AS start#x, count(1) AS cnt#xL]
+ +- Filter isnotnull(ts#x)
+ +- Expand [[named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) +
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0)
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(preci [...]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT * FROM (
+ SELECT
+ a,
+ window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, window(ts, '5 minutes', '1 minute')
+) sub
+ORDER BY a, ws
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, ws#x ASC NULLS FIRST], true
++- Project [a#x, ws#x, cnt#xL]
+ +- SubqueryAlias sub
+ +- Aggregate [a#x, window#x], [a#x, window#x.start AS ws#x, count(1) AS
cnt#xL]
+ +- Filter isnotnull(ts#x)
+ +- Expand [[named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) +
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0)
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(pr [...]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ window.start,
+ window.end,
+ count(*) AS counts
+FROM VALUES
+ (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds', '5 seconds')
+ORDER BY window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [window#x], [window#x.start AS start#x, window#x.end AS end#x,
count(1) AS counts#xL]
+ +- Filter isnotnull(time#x)
+ +- Expand [[named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(time#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(time#x,
TimestampType, LongType) - 0) % 5000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(time#x, TimestampType, LongType) - 0) % 5000000)
+ 5000000) ELSE ((precisetimestampconversion(time#x, TimestampType, LongType) -
0) % 5000000) END) - 0), LongType, TimestampType)), end, knownnullable(prec
[...]
+ +- SubqueryAlias tab
+ +- LocalRelation [time#x, value#x, id#x]
+
+
+-- !query
+SELECT
+ window.start,
+ window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes', '1 minute')
+ORDER BY start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [window#x], [window#x.start AS start#x, window#x.end AS end#x,
count(1) AS cnt#xL]
+ +- Filter isnotnull(ts#x)
+ +- Expand [[named_struct(start,
knownnullable(precisetimestampconversion(((precisetimestampconversion(ts#x,
TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(ts#x,
TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN
(((precisetimestampconversion(ts#x, TimestampType, LongType) - 0) % 60000000) +
60000000) ELSE ((precisetimestampconversion(ts#x, TimestampType, LongType) - 0)
% 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(preciset
[...]
+ +- SubqueryAlias tab
+ +- LocalRelation [ts#x]
+
+
+-- !query
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, session_window#x, spark_grouping_id#xL], [a#x,
session_window#x.start AS start#x, count(1) AS cnt#xL]
+ +- Expand [[session_window#x, a#x, ts#x, a#x, session_window#x, 0],
[session_window#x, a#x, ts#x, a#x, null, 1], [session_window#x, a#x, ts#x,
null, null, 3]], [session_window#x, a#x, ts#x, a#x, session_window#x,
spark_grouping_id#xL]
+ +- Project [session_window#x, a#x, ts#x, a#x AS a#x, session_window#x AS
session_window#x]
+ +- Filter isnotnull(ts#x)
+ +- Project [named_struct(start,
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType,
LongType), LongType, TimestampType), end,
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x +
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType,
TimestampType))) AS session_window#x, a#x, ts#x]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, session_window#x, spark_grouping_id#xL], [a#x,
session_window#x.start AS start#x, count(1) AS cnt#xL]
+ +- Expand [[session_window#x, a#x, ts#x, a#x, session_window#x, 0],
[session_window#x, a#x, ts#x, a#x, null, 1], [session_window#x, a#x, ts#x,
null, session_window#x, 2], [session_window#x, a#x, ts#x, null, null, 3]],
[session_window#x, a#x, ts#x, a#x, session_window#x, spark_grouping_id#xL]
+ +- Project [session_window#x, a#x, ts#x, a#x AS a#x, session_window#x AS
session_window#x]
+ +- Filter isnotnull(ts#x)
+ +- Project [named_struct(start,
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType,
LongType), LongType, TimestampType), end,
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x +
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType,
TimestampType))) AS session_window#x, a#x, ts#x]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, session_window(ts, '5 minutes')), (a))
+ORDER BY a, start
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, start#x ASC NULLS FIRST], true
++- Aggregate [a#x, session_window#x, spark_grouping_id#xL], [a#x,
session_window#x.start AS start#x, count(1) AS cnt#xL]
+ +- Expand [[session_window#x, a#x, ts#x, a#x, session_window#x, 0],
[session_window#x, a#x, ts#x, a#x, null, 1]], [session_window#x, a#x, ts#x,
a#x, session_window#x, spark_grouping_id#xL]
+ +- Project [session_window#x, a#x, ts#x, a#x AS a#x, session_window#x AS
session_window#x]
+ +- Filter isnotnull(ts#x)
+ +- Project [named_struct(start,
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType,
LongType), LongType, TimestampType), end,
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x +
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType,
TimestampType))) AS session_window#x, a#x, ts#x]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(session_window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt
+-- !query analysis
+Sort [cnt#xL ASC NULLS FIRST], true
++- Aggregate [concat(cast(session_window#x.start as string), a#x)], [count(1)
AS cnt#xL]
+ +- Filter isnotnull(ts#x)
+ +- Project [named_struct(start,
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType,
LongType), LongType, TimestampType), end,
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x +
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType,
TimestampType))) AS session_window#x, a#x, ts#x]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as timestamp)
AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY session_window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY session_window.start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Filter (cnt#xL > cast(1 as bigint))
+ +- Aggregate [session_window#x], [session_window#x.start AS start#x,
count(1) AS cnt#xL]
+ +- Filter isnotnull(ts#x)
+ +- Project [named_struct(start,
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType,
LongType), LongType, TimestampType), end,
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x +
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType,
TimestampType))) AS session_window#x, a#x, ts#x]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT * FROM (
+ SELECT
+ a,
+ session_window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, session_window(ts, '5 minutes')
+) sub
+ORDER BY a, ws
+-- !query analysis
+Sort [a#x ASC NULLS FIRST, ws#x ASC NULLS FIRST], true
++- Project [a#x, ws#x, cnt#xL]
+ +- SubqueryAlias sub
+ +- Aggregate [a#x, session_window#x], [a#x, session_window#x.start AS
ws#x, count(1) AS cnt#xL]
+ +- Filter isnotnull(ts#x)
+ +- Project [named_struct(start,
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType,
LongType), LongType, TimestampType), end,
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x +
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType,
TimestampType))) AS session_window#x, a#x, ts#x]
+ +- SubqueryAlias ts_data
+ +- View (`ts_data`, [a#x, ts#x])
+ +- Project [cast(a#x as string) AS a#x, cast(ts#x as
timestamp) AS ts#x]
+ +- Project [a#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [a#x, ts#x]
+
+
+-- !query
+SELECT
+ session_window.start,
+ session_window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY session_window(ts, '5 minutes')
+ORDER BY start
+-- !query analysis
+Sort [start#x ASC NULLS FIRST], true
++- Aggregate [session_window#x], [session_window#x.start AS start#x,
session_window#x.end AS end#x, count(1) AS cnt#xL]
+ +- Filter isnotnull(ts#x)
+ +- Project [named_struct(start,
precisetimestampconversion(precisetimestampconversion(ts#x, TimestampType,
LongType), LongType, TimestampType), end,
knownnullable(precisetimestampconversion(precisetimestampconversion(cast(ts#x +
cast(5 minutes as interval) as timestamp), TimestampType, LongType), LongType,
TimestampType))) AS session_window#x, ts#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [ts#x]
diff --git a/sql/core/src/test/resources/sql-tests/inputs/time-window.sql
b/sql/core/src/test/resources/sql-tests/inputs/time-window.sql
new file mode 100644
index 000000000000..31771ff8d934
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/time-window.sql
@@ -0,0 +1,231 @@
+-- Setup: temp view used across cases
+CREATE OR REPLACE TEMPORARY VIEW ts_data AS
+SELECT * FROM VALUES
+ ('A1', CAST('2021-01-01 00:00:00' AS TIMESTAMP)),
+ ('A1', CAST('2021-01-01 00:04:30' AS TIMESTAMP)),
+ ('A1', CAST('2021-01-01 00:06:00' AS TIMESTAMP)),
+ ('A2', CAST('2021-01-01 00:01:00' AS TIMESTAMP))
+AS tab(a, ts);
+
+-- window_with_rollup
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start;
+
+-- window_with_cube
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start;
+
+-- window_with_grouping_sets
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes')), (a))
+ORDER BY a, start;
+
+-- window_nested_in_expression
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt;
+
+-- window_having
+SELECT
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY window.start;
+
+-- window_in_subquery
+SELECT * FROM (
+ SELECT
+ a,
+ window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, window(ts, '5 minutes')
+) sub
+ORDER BY a, ws;
+
+-- record_at_window_start
+SELECT
+ window.start,
+ window.end,
+ count(*) AS counts
+FROM VALUES
+ (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds')
+ORDER BY window.start;
+
+-- tumbling_window_pre_epoch
+SELECT
+ window.start,
+ window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes')
+ORDER BY start;
+
+-- sliding_window_with_rollup
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH ROLLUP
+ORDER BY a, start;
+
+-- sliding_window_with_cube
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH CUBE
+ORDER BY a, start;
+
+-- sliding_window_with_grouping_sets
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes', '1 minute')), (a))
+ORDER BY a, start;
+
+-- sliding_window_nested_in_expression
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes', '1 minute').start AS string), a)
+ORDER BY cnt;
+
+-- sliding_window_having
+SELECT
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes', '1 minute')
+HAVING count(*) > 1
+ORDER BY window.start;
+
+-- sliding_window_in_subquery
+SELECT * FROM (
+ SELECT
+ a,
+ window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, window(ts, '5 minutes', '1 minute')
+) sub
+ORDER BY a, ws;
+
+-- record_at_sliding_window_start
+SELECT
+ window.start,
+ window.end,
+ count(*) AS counts
+FROM VALUES
+ (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds', '5 seconds')
+ORDER BY window.start;
+
+-- sliding_window_pre_epoch
+SELECT
+ window.start,
+ window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes', '1 minute')
+ORDER BY start;
+
+-- session_window_with_rollup
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start;
+
+-- session_window_with_cube
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start;
+
+-- session_window_with_grouping_sets
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, session_window(ts, '5 minutes')), (a))
+ORDER BY a, start;
+
+-- session_window_nested_in_expression
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(session_window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt;
+
+-- session_window_having
+SELECT
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY session_window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY session_window.start;
+
+-- session_window_in_subquery
+SELECT * FROM (
+ SELECT
+ a,
+ session_window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, session_window(ts, '5 minutes')
+) sub
+ORDER BY a, ws;
+
+-- session_window_pre_epoch
+SELECT
+ session_window.start,
+ session_window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY session_window(ts, '5 minutes')
+ORDER BY start;
diff --git a/sql/core/src/test/resources/sql-tests/results/time-window.sql.out
b/sql/core/src/test/resources/sql-tests/results/time-window.sql.out
new file mode 100644
index 000000000000..0e3bca21d4a0
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/time-window.sql.out
@@ -0,0 +1,508 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW ts_data AS
+SELECT * FROM VALUES
+ ('A1', CAST('2021-01-01 00:00:00' AS TIMESTAMP)),
+ ('A1', CAST('2021-01-01 00:04:30' AS TIMESTAMP)),
+ ('A1', CAST('2021-01-01 00:06:00' AS TIMESTAMP)),
+ ('A2', CAST('2021-01-01 00:01:00' AS TIMESTAMP))
+AS tab(a, ts)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+NULL NULL 4
+A1 NULL 3
+A1 2021-01-01 00:00:00 2
+A1 2021-01-01 00:05:00 1
+A2 NULL 1
+A2 2021-01-01 00:00:00 1
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+NULL NULL 4
+NULL 2021-01-01 00:00:00 3
+NULL 2021-01-01 00:05:00 1
+A1 NULL 3
+A1 2021-01-01 00:00:00 2
+A1 2021-01-01 00:05:00 1
+A2 NULL 1
+A2 2021-01-01 00:00:00 1
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes')), (a))
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+A1 NULL 3
+A1 2021-01-01 00:00:00 2
+A1 2021-01-01 00:05:00 1
+A2 NULL 1
+A2 2021-01-01 00:00:00 1
+
+
+-- !query
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt
+-- !query schema
+struct<cnt:bigint>
+-- !query output
+1
+1
+2
+
+
+-- !query
+SELECT
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY window.start
+-- !query schema
+struct<start:timestamp,cnt:bigint>
+-- !query output
+2021-01-01 00:00:00 3
+
+
+-- !query
+SELECT * FROM (
+ SELECT
+ a,
+ window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, window(ts, '5 minutes')
+) sub
+ORDER BY a, ws
+-- !query schema
+struct<a:string,ws:timestamp,cnt:bigint>
+-- !query output
+A1 2021-01-01 00:00:00 2
+A1 2021-01-01 00:05:00 1
+A2 2021-01-01 00:00:00 1
+
+
+-- !query
+SELECT
+ window.start,
+ window.end,
+ count(*) AS counts
+FROM VALUES
+ (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds')
+ORDER BY window.start
+-- !query schema
+struct<start:timestamp,end:timestamp,counts:bigint>
+-- !query output
+2016-03-27 19:39:30 2016-03-27 19:39:40 1
+
+
+-- !query
+SELECT
+ window.start,
+ window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes')
+ORDER BY start
+-- !query schema
+struct<start:timestamp,end:timestamp,cnt:bigint>
+-- !query output
+1969-12-31 23:55:00 1970-01-01 00:00:00 2
+1970-01-01 00:00:00 1970-01-01 00:05:00 1
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH ROLLUP
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+NULL NULL 20
+A1 NULL 15
+A1 2020-12-31 23:56:00 1
+A1 2020-12-31 23:57:00 1
+A1 2020-12-31 23:58:00 1
+A1 2020-12-31 23:59:00 1
+A1 2021-01-01 00:00:00 2
+A1 2021-01-01 00:01:00 1
+A1 2021-01-01 00:02:00 2
+A1 2021-01-01 00:03:00 2
+A1 2021-01-01 00:04:00 2
+A1 2021-01-01 00:05:00 1
+A1 2021-01-01 00:06:00 1
+A2 NULL 5
+A2 2020-12-31 23:57:00 1
+A2 2020-12-31 23:58:00 1
+A2 2020-12-31 23:59:00 1
+A2 2021-01-01 00:00:00 1
+A2 2021-01-01 00:01:00 1
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, window(ts, '5 minutes', '1 minute') WITH CUBE
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+NULL NULL 20
+NULL 2020-12-31 23:56:00 1
+NULL 2020-12-31 23:57:00 2
+NULL 2020-12-31 23:58:00 2
+NULL 2020-12-31 23:59:00 2
+NULL 2021-01-01 00:00:00 3
+NULL 2021-01-01 00:01:00 2
+NULL 2021-01-01 00:02:00 2
+NULL 2021-01-01 00:03:00 2
+NULL 2021-01-01 00:04:00 2
+NULL 2021-01-01 00:05:00 1
+NULL 2021-01-01 00:06:00 1
+A1 NULL 15
+A1 2020-12-31 23:56:00 1
+A1 2020-12-31 23:57:00 1
+A1 2020-12-31 23:58:00 1
+A1 2020-12-31 23:59:00 1
+A1 2021-01-01 00:00:00 2
+A1 2021-01-01 00:01:00 1
+A1 2021-01-01 00:02:00 2
+A1 2021-01-01 00:03:00 2
+A1 2021-01-01 00:04:00 2
+A1 2021-01-01 00:05:00 1
+A1 2021-01-01 00:06:00 1
+A2 NULL 5
+A2 2020-12-31 23:57:00 1
+A2 2020-12-31 23:58:00 1
+A2 2020-12-31 23:59:00 1
+A2 2021-01-01 00:00:00 1
+A2 2021-01-01 00:01:00 1
+
+
+-- !query
+SELECT
+ a,
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, window(ts, '5 minutes', '1 minute')), (a))
+ORDER BY a, start
+-- !query schema
+struct<a:string,start:timestamp,cnt:bigint>
+-- !query output
+A1 NULL 15
+A1 2020-12-31 23:56:00 1
+A1 2020-12-31 23:57:00 1
+A1 2020-12-31 23:58:00 1
+A1 2020-12-31 23:59:00 1
+A1 2021-01-01 00:00:00 2
+A1 2021-01-01 00:01:00 1
+A1 2021-01-01 00:02:00 2
+A1 2021-01-01 00:03:00 2
+A1 2021-01-01 00:04:00 2
+A1 2021-01-01 00:05:00 1
+A1 2021-01-01 00:06:00 1
+A2 NULL 5
+A2 2020-12-31 23:57:00 1
+A2 2020-12-31 23:58:00 1
+A2 2020-12-31 23:59:00 1
+A2 2021-01-01 00:00:00 1
+A2 2021-01-01 00:01:00 1
+
+
+-- !query
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(window(ts, '5 minutes', '1 minute').start AS string), a)
+ORDER BY cnt
+-- !query schema
+struct<cnt:bigint>
+-- !query output
+1
+1
+1
+1
+1
+1
+1
+1
+1
+1
+1
+1
+2
+2
+2
+2
+
+
+-- !query
+SELECT
+ window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY window(ts, '5 minutes', '1 minute')
+HAVING count(*) > 1
+ORDER BY window.start
+-- !query schema
+struct<start:timestamp,cnt:bigint>
+-- !query output
+2020-12-31 23:57:00 2
+2020-12-31 23:58:00 2
+2020-12-31 23:59:00 2
+2021-01-01 00:00:00 3
+2021-01-01 00:01:00 2
+2021-01-01 00:02:00 2
+2021-01-01 00:03:00 2
+2021-01-01 00:04:00 2
+
+
+-- !query
+SELECT * FROM (
+ SELECT
+ a,
+ window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, window(ts, '5 minutes', '1 minute')
+) sub
+ORDER BY a, ws
+-- !query schema
+struct<a:string,ws:timestamp,cnt:bigint>
+-- !query output
+A1 2020-12-31 23:56:00 1
+A1 2020-12-31 23:57:00 1
+A1 2020-12-31 23:58:00 1
+A1 2020-12-31 23:59:00 1
+A1 2021-01-01 00:00:00 2
+A1 2021-01-01 00:01:00 1
+A1 2021-01-01 00:02:00 2
+A1 2021-01-01 00:03:00 2
+A1 2021-01-01 00:04:00 2
+A1 2021-01-01 00:05:00 1
+A1 2021-01-01 00:06:00 1
+A2 2020-12-31 23:57:00 1
+A2 2020-12-31 23:58:00 1
+A2 2020-12-31 23:59:00 1
+A2 2021-01-01 00:00:00 1
+A2 2021-01-01 00:01:00 1
+
+
+-- !query
+SELECT
+ window.start,
+ window.end,
+ count(*) AS counts
+FROM VALUES
+ (CAST('2016-03-27 19:39:30' AS TIMESTAMP), 1, 'a')
+AS tab(time, value, id)
+GROUP BY window(time, '10 seconds', '5 seconds')
+ORDER BY window.start
+-- !query schema
+struct<start:timestamp,end:timestamp,counts:bigint>
+-- !query output
+2016-03-27 19:39:25 2016-03-27 19:39:35 1
+2016-03-27 19:39:30 2016-03-27 19:39:40 1
+
+
+-- !query
+SELECT
+ window.start,
+ window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY window(ts, '5 minutes', '1 minute')
+ORDER BY start
+-- !query schema
+struct<start:timestamp,end:timestamp,cnt:bigint>
+-- !query output
+1969-12-31 23:51:00 1969-12-31 23:56:00 1
+1969-12-31 23:52:00 1969-12-31 23:57:00 1
+1969-12-31 23:53:00 1969-12-31 23:58:00 1
+1969-12-31 23:54:00 1969-12-31 23:59:00 1
+1969-12-31 23:55:00 1970-01-01 00:00:00 2
+1969-12-31 23:56:00 1970-01-01 00:01:00 2
+1969-12-31 23:57:00 1970-01-01 00:02:00 2
+1969-12-31 23:58:00 1970-01-01 00:03:00 2
+1969-12-31 23:59:00 1970-01-01 00:04:00 2
+1970-01-01 00:00:00 1970-01-01 00:05:00 1
+
+
+-- !query
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH ROLLUP
+ORDER BY a, start
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkException
+{
+ "errorClass" : "INTERNAL_ERROR",
+ "sqlState" : "XX000",
+ "messageParameters" : {
+ "message" : "Grouping Session should not be null."
+ }
+}
+
+
+-- !query
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY a, session_window(ts, '5 minutes') WITH CUBE
+ORDER BY a, start
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkException
+{
+ "errorClass" : "INTERNAL_ERROR",
+ "sqlState" : "XX000",
+ "messageParameters" : {
+ "message" : "Grouping Session should not be null."
+ }
+}
+
+
+-- !query
+SELECT
+ a,
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY GROUPING SETS ((a, session_window(ts, '5 minutes')), (a))
+ORDER BY a, start
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkException
+{
+ "errorClass" : "INTERNAL_ERROR",
+ "sqlState" : "XX000",
+ "messageParameters" : {
+ "message" : "Grouping Session should not be null."
+ }
+}
+
+
+-- !query
+SELECT
+ count(*) AS cnt
+FROM ts_data
+GROUP BY concat(cast(session_window(ts, '5 minutes').start AS string), a)
+ORDER BY cnt
+-- !query schema
+struct<cnt:bigint>
+-- !query output
+1
+1
+1
+1
+
+
+-- !query
+SELECT
+ session_window.start,
+ count(*) AS cnt
+FROM ts_data
+GROUP BY session_window(ts, '5 minutes')
+HAVING count(*) > 1
+ORDER BY session_window.start
+-- !query schema
+struct<start:timestamp,cnt:bigint>
+-- !query output
+2021-01-01 00:00:00 4
+
+
+-- !query
+SELECT * FROM (
+ SELECT
+ a,
+ session_window.start AS ws,
+ count(*) AS cnt
+ FROM ts_data
+ GROUP BY a, session_window(ts, '5 minutes')
+) sub
+ORDER BY a, ws
+-- !query schema
+struct<a:string,ws:timestamp,cnt:bigint>
+-- !query output
+A1 2021-01-01 00:00:00 3
+A2 2021-01-01 00:01:00 1
+
+
+-- !query
+SELECT
+ session_window.start,
+ session_window.end,
+ count(*) AS cnt
+FROM VALUES
+ (CAST('1969-12-31 23:59:59.500000' AS TIMESTAMP)),
+ (CAST('1969-12-31 23:55:00.000000' AS TIMESTAMP)),
+ (CAST('1970-01-01 00:00:00.000000' AS TIMESTAMP))
+AS tab(ts)
+GROUP BY session_window(ts, '5 minutes')
+ORDER BY start
+-- !query schema
+struct<start:timestamp,end:timestamp,cnt:bigint>
+-- !query output
+1969-12-31 23:55:00 1970-01-01 00:05:00 3
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]