Repository: spark
Updated Branches:
  refs/heads/master 1e8861598 -> 1b829ce13


[SPARK-14160] Time Windowing functions for Datasets

## What changes were proposed in this pull request?

This PR adds the function `window` as a column expression.

`window` can be used to bucket rows into time windows given a time column. With 
this expression, performing time series analysis on batch data, as well as 
streaming data should become much more simpler.

### Usage

Assume the following schema:

`sensor_id, measurement, timestamp`

To average 5 minute data every 1 minute (window length of 5 minutes, slide 
duration of 1 minute), we will use:
```scala
df.groupBy(window("timestamp", “5 minutes”, “1 minute”), "sensor_id")
  .agg(mean("measurement").as("avg_meas"))
```

This will generate windows such as:
```
09:00:00-09:05:00
09:01:00-09:06:00
09:02:00-09:07:00 ...
```

Intervals will start at every `slideDuration` starting at the unix epoch 
(1970-01-01 00:00:00 UTC).
To start intervals at a different point of time, e.g. 30 seconds after a 
minute, the `startTime` parameter can be used.

```scala
df.groupBy(window("timestamp", “5 minutes”, “1 minute”, "30 second"), 
"sensor_id")
  .agg(mean("measurement").as("avg_meas"))
```

This will generate windows such as:
```
09:00:30-09:05:30
09:01:30-09:06:30
09:02:30-09:07:30 ...
```

Support for Python will be made in a follow up PR after this.

## How was this patch tested?

This patch has some basic unit tests for the `TimeWindow` expression testing 
that the parameters pass validation, and it also has some unit/integration 
tests testing the correctness of the windowing and usability in complex 
operations (multi-column grouping, multi-column projections, joins).

Author: Burak Yavuz <[email protected]>
Author: Michael Armbrust <[email protected]>

Closes #12008 from brkyvz/df-time-window.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b829ce1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b829ce1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b829ce1

Branch: refs/heads/master
Commit: 1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c
Parents: 1e88615
Author: Burak Yavuz <[email protected]>
Authored: Fri Apr 1 13:19:24 2016 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Fri Apr 1 13:19:24 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  90 +++++++
 .../catalyst/analysis/FunctionRegistry.scala    |   1 +
 .../sql/catalyst/expressions/TimeWindow.scala   | 133 ++++++++++
 .../catalyst/analysis/AnalysisErrorSuite.scala  |  56 +++++
 .../catalyst/expressions/TimeWindowSuite.scala  |  76 ++++++
 .../scala/org/apache/spark/sql/functions.scala  | 137 +++++++++++
 .../spark/sql/DataFrameTimeWindowingSuite.scala | 242 +++++++++++++++++++
 7 files changed, 735 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b829ce1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8dc0532..d82ee3a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -102,6 +102,7 @@ class Analyzer(
       ExtractWindowExpressions ::
       GlobalAggregates ::
       ResolveAggregateFunctions ::
+      TimeWindowing ::
       HiveTypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
     Batch("Nondeterministic", Once,
@@ -1591,3 +1592,92 @@ object ResolveUpCast extends Rule[LogicalPlan] {
     }
   }
 }
+
+/**
+ * Maps a time column to multiple time windows using the Expand operator. 
Since it's non-trivial to
+ * figure out how many windows a time column can map to, we over-estimate the 
number of windows and
+ * filter out the rows where the time column is not inside the time window.
+ */
+object TimeWindowing extends Rule[LogicalPlan] {
+  import org.apache.spark.sql.catalyst.dsl.expressions._
+
+  private final val WINDOW_START = "start"
+  private final val WINDOW_END = "end"
+
+  /**
+   * Generates the logical plan for generating window ranges on a timestamp 
column. Without
+   * knowing what the timestamp value is, it's non-trivial to figure out 
deterministically how many
+   * window ranges a timestamp will map to given all possible combinations of 
a window duration,
+   * slide duration and start time (offset). Therefore, we express and 
over-estimate the number of
+   * windows there may be, and filter the valid windows. We use last Project 
operator to group
+   * the window columns into a struct so they can be accessed as 
`window.start` and `window.end`.
+   *
+   * The windows are calculated as below:
+   * maxNumOverlapping <- ceil(windowDuration / slideDuration)
+   * for (i <- 0 until maxNumOverlapping)
+   *   windowId <- ceil((timestamp - startTime) / slideDuration)
+   *   windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * 
slideDuration + startTime
+   *   windowEnd <- windowStart + windowDuration
+   *   return windowStart, windowEnd
+   *
+   * This behaves as follows for the given parameters for the time: 12:05. The 
valid windows are
+   * marked with a +, and invalid ones are marked with a x. The invalid ones 
are filtered using the
+   * Filter operator.
+   * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
+   *     11:55 - 12:07 +                      11:52 - 12:04 x
+   *     12:00 - 12:12 +                      11:57 - 12:09 +
+   *     12:05 - 12:17 +                      12:02 - 12:14 +
+   *
+   * @param plan The logical plan
+   * @return the logical plan that will generate the time windows using the 
Expand operator, with
+   *         the Filter operator for correctness and Project for usability.
+   */
+  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowExpressions =
+        p.expressions.flatMap(_.collect { case t: TimeWindow => t 
}).distinct.toList // Not correct.
+
+      // Only support a single window expression for now
+      if (windowExpressions.size == 1 &&
+          windowExpressions.head.timeColumn.resolved &&
+          windowExpressions.head.checkInputDataTypes().isSuccess) {
+        val window = windowExpressions.head
+        val windowAttr = AttributeReference("window", window.dataType)()
+
+        val maxNumOverlapping = math.ceil(window.windowDuration * 1.0 / 
window.slideDuration).toInt
+        val windows = Seq.tabulate(maxNumOverlapping + 1) { i =>
+          val windowId = Ceil((PreciseTimestamp(window.timeColumn) - 
window.startTime) /
+            window.slideDuration)
+          val windowStart = (windowId + i - maxNumOverlapping) *
+              window.slideDuration + window.startTime
+          val windowEnd = windowStart + window.windowDuration
+
+          CreateNamedStruct(
+            Literal(WINDOW_START) :: windowStart ::
+            Literal(WINDOW_END) :: windowEnd :: Nil)
+        }
+
+        val projections = windows.map(_ +: p.children.head.output)
+
+        val filterExpr =
+          window.timeColumn >= windowAttr.getField(WINDOW_START) &&
+          window.timeColumn < windowAttr.getField(WINDOW_END)
+
+        val expandedPlan =
+          Filter(filterExpr,
+            Expand(projections, windowAttr +: child.output, child))
+
+        val substitutedPlan = p transformExpressions {
+          case t: TimeWindow => windowAttr
+        }
+
+        substitutedPlan.withNewChildren(expandedPlan :: Nil)
+      } else if (windowExpressions.size > 1) {
+        p.failAnalysis("Multiple time window expressions would result in a 
cartesian product " +
+          "of rows, therefore they are not currently not supported.")
+      } else {
+        p // Return unchanged. Analyzer will throw exception later
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1b829ce1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e9788b7..ca8db3c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -297,6 +297,7 @@ object FunctionRegistry {
     expression[UnixTimestamp]("unix_timestamp"),
     expression[WeekOfYear]("weekofyear"),
     expression[Year]("year"),
+    expression[TimeWindow]("window"),
 
     // collection functions
     expression[ArrayContains]("array_contains"),

http://git-wip-us.apache.org/repos/asf/spark/blob/1b829ce1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
new file mode 100644
index 0000000..8e13833
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.commons.lang.StringUtils
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+case class TimeWindow(
+    timeColumn: Expression,
+    windowDuration: Long,
+    slideDuration: Long,
+    startTime: Long) extends UnaryExpression
+  with ImplicitCastInputTypes
+  with Unevaluable
+  with NonSQLExpression {
+
+  override def child: Expression = timeColumn
+  override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
+  override def dataType: DataType = new StructType()
+    .add(StructField("start", TimestampType))
+    .add(StructField("end", TimestampType))
+
+  // This expression is replaced in the analyzer.
+  override lazy val resolved = false
+
+  /**
+   * Validate the inputs for the window duration, slide duration, and start 
time in addition to
+   * the input data type.
+   */
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val dataTypeCheck = super.checkInputDataTypes()
+    if (dataTypeCheck.isSuccess) {
+      if (windowDuration <= 0) {
+        return TypeCheckFailure(s"The window duration ($windowDuration) must 
be greater than 0.")
+      }
+      if (slideDuration <= 0) {
+        return TypeCheckFailure(s"The slide duration ($slideDuration) must be 
greater than 0.")
+      }
+      if (startTime < 0) {
+        return TypeCheckFailure(s"The start time ($startTime) must be greater 
than or equal to 0.")
+      }
+      if (slideDuration > windowDuration) {
+        return TypeCheckFailure(s"The slide duration ($slideDuration) must be 
less than or equal" +
+          s" to the windowDuration ($windowDuration).")
+      }
+      if (startTime >= slideDuration) {
+        return TypeCheckFailure(s"The start time ($startTime) must be less 
than the " +
+          s"slideDuration ($slideDuration).")
+      }
+    }
+    dataTypeCheck
+  }
+}
+
+object TimeWindow {
+  /**
+   * Parses the interval string for a valid time duration. CalendarInterval 
expects interval
+   * strings to start with the string `interval`. For usability, we prepend 
`interval` to the string
+   * if the user omitted it.
+   *
+   * @param interval The interval string
+   * @return The interval duration in microseconds. SparkSQL casts 
TimestampType has microsecond
+   *         precision.
+   */
+  private def getIntervalInMicroSeconds(interval: String): Long = {
+    if (StringUtils.isBlank(interval)) {
+      throw new IllegalArgumentException(
+        "The window duration, slide duration and start time cannot be null or 
blank.")
+    }
+    val intervalString = if (interval.startsWith("interval")) {
+      interval
+    } else {
+      "interval " + interval
+    }
+    val cal = CalendarInterval.fromString(intervalString)
+    if (cal == null) {
+      throw new IllegalArgumentException(
+        s"The provided interval ($interval) did not correspond to a valid 
interval string.")
+    }
+    if (cal.months > 0) {
+      throw new IllegalArgumentException(
+        s"Intervals greater than a month is not supported ($interval).")
+    }
+    cal.microseconds
+  }
+
+  def apply(
+      timeColumn: Expression,
+      windowDuration: String,
+      slideDuration: String,
+      startTime: String): TimeWindow = {
+    TimeWindow(timeColumn,
+      getIntervalInMicroSeconds(windowDuration),
+      getIntervalInMicroSeconds(slideDuration),
+      getIntervalInMicroSeconds(startTime))
+  }
+}
+
+/**
+ * Expression used internally to convert the TimestampType to Long without 
losing
+ * precision, i.e. in microseconds. Used in time windowing.
+ */
+case class PreciseTimestamp(child: Expression) extends UnaryExpression with 
ExpectsInputTypes {
+  override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
+  override def dataType: DataType = LongType
+  override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
+    val eval = child.gen(ctx)
+    eval.code +
+      s"""boolean ${ev.isNull} = ${eval.isNull};
+         |${ctx.javaType(dataType)} ${ev.value} = ${eval.value};
+       """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1b829ce1/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index a90dfc5..ad101d1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -272,6 +272,62 @@ class AnalysisErrorSuite extends AnalysisTest {
     testRelation2.where('bad_column > 1).groupBy('a)(UnresolvedAlias(max('b))),
     "cannot resolve '`bad_column`'" :: Nil)
 
+  errorTest(
+    "slide duration greater than window in time window",
+    testRelation2.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "2 second", "0 
second").as("window")),
+      s"The slide duration " :: " must be less than or equal to the 
windowDuration " :: Nil
+  )
+
+  errorTest(
+    "start time greater than slide duration in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 
minute").as("window")),
+      "The start time " :: " must be less than the slideDuration " :: Nil
+  )
+
+  errorTest(
+    "start time equal to slide duration in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 
second").as("window")),
+      "The start time " :: " must be less than the slideDuration " :: Nil
+  )
+
+  errorTest(
+    "negative window duration in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "-1 second", "1 second", "0 
second").as("window")),
+      "The window duration " :: " must be greater than 0." :: Nil
+  )
+
+  errorTest(
+    "zero window duration in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "0 second", "1 second", "0 
second").as("window")),
+      "The window duration " :: " must be greater than 0." :: Nil
+  )
+
+  errorTest(
+    "negative slide duration in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "-1 second", "0 
second").as("window")),
+      "The slide duration " :: " must be greater than 0." :: Nil
+  )
+
+  errorTest(
+    "zero slide duration in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "0 second", "0 
second").as("window")),
+      "The slide duration" :: " must be greater than 0." :: Nil
+  )
+
+  errorTest(
+    "negative start time in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-5 
second").as("window")),
+      "The start time" :: "must be greater than or equal to 0." :: Nil
+  )
+
   test("SPARK-6452 regression test") {
     // CheckAnalysis should throw AnalysisException when Aggregate contains 
missing attribute(s)
     // Since we manually construct the logical plan at here and Sum only accept

http://git-wip-us.apache.org/repos/asf/spark/blob/1b829ce1/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
new file mode 100644
index 0000000..71f969a
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+
+class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper {
+
+  test("time window is unevaluable") {
+    intercept[UnsupportedOperationException] {
+      evaluate(TimeWindow(Literal(10L), "1 second", "1 second", "0 second"))
+    }
+  }
+
+  private def checkErrorMessage(msg: String, value: String): Unit = {
+    val validDuration = "10 second"
+    val validTime = "5 second"
+    val e1 = intercept[IllegalArgumentException] {
+      TimeWindow(Literal(10L), value, validDuration, validTime).windowDuration
+    }
+    val e2 = intercept[IllegalArgumentException] {
+      TimeWindow(Literal(10L), validDuration, value, validTime).slideDuration
+    }
+    val e3 = intercept[IllegalArgumentException] {
+      TimeWindow(Literal(10L), validDuration, validDuration, value).startTime
+    }
+    Seq(e1, e2, e3).foreach { e =>
+      e.getMessage.contains(msg)
+    }
+  }
+
+  test("blank intervals throw exception") {
+    for (blank <- Seq(null, " ", "\n", "\t")) {
+      checkErrorMessage(
+        "The window duration, slide duration and start time cannot be null or 
blank.", blank)
+    }
+  }
+
+  test("invalid intervals throw exception") {
+    checkErrorMessage(
+      "did not correspond to a valid interval string.", "2 apples")
+  }
+
+  test("intervals greater than a month throws exception") {
+    checkErrorMessage(
+      "Intervals greater than or equal to a month is not supported (1 
month).", "1 month")
+  }
+
+  test("interval strings work with and without 'interval' prefix and return 
microseconds") {
+    val validDuration = "10 second"
+    for ((text, seconds) <- Seq(
+      ("1 second", 1000000), // 1e6
+      ("1 minute", 60000000), // 6e7
+      ("2 hours", 7200000000L))) { // 72e9
+      assert(TimeWindow(Literal(10L), text, validDuration, "0 
seconds").windowDuration === seconds)
+      assert(TimeWindow(Literal(10L), "interval " + text, validDuration, "0 
seconds").windowDuration
+        === seconds)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1b829ce1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 7ce15e3..7490605 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2550,6 +2550,143 @@ object functions {
     ToUTCTimestamp(ts.expr, Literal(tz))
   }
 
+  /**
+   * Bucketize rows into one or more time windows given a timestamp specifying 
column. Window
+   * starts are inclusive but the window ends are exclusive, e.g. 12:05 will 
be in the window
+   * [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond 
precision. Windows in
+   * the order of months are not supported. The following example takes the 
average stock price for
+   * a one minute window every 10 seconds starting 5 seconds after the hour:
+   *
+   * {{{
+   *   val df = ... // schema => timestamp: TimestampType, stockId: 
StringType, price: DoubleType
+   *   df.groupBy(window($"time", "1 minute", "10 seconds", "5 seconds"), 
$"stockId")
+   *     .agg(mean("price"))
+   * }}}
+   *
+   * The windows will look like:
+   *
+   * {{{
+   *   09:00:05-09:01:05
+   *   09:00:15-09:01:15
+   *   09:00:25-09:01:25 ...
+   * }}}
+   *
+   * For a continuous query, you may use the function `current_timestamp` to 
generate windows on
+   * processing time.
+   *
+   * @param timeColumn The column or the expression to use as the timestamp 
for windowing by time.
+   *                   The time can be as TimestampType or LongType, however 
when using LongType,
+   *                   the time must be given in seconds.
+   * @param windowDuration A string specifying the width of the window, e.g. 
`10 minutes`,
+   *                       `1 second`. Check 
[[org.apache.spark.unsafe.types.CalendarInterval]] for
+   *                       valid duration identifiers.
+   * @param slideDuration A string specifying the sliding interval of the 
window, e.g. `1 minute`.
+   *                      A new window will be generated every 
`slideDuration`. Must be less than
+   *                      or equal to the `windowDuration`. Check
+   *                      [[org.apache.spark.unsafe.types.CalendarInterval]] 
for valid duration
+   *                      identifiers.
+   * @param startTime The offset with respect to 1970-01-01 00:00:00 UTC with 
which to start
+   *                  window intervals. For example, in order to have hourly 
tumbling windows that
+   *                  start 15 minutes past the hour, e.g. 12:15-13:15, 
13:15-14:15... provide
+   *                  `startTime` as `15 minutes`.
+   *
+   * @group datetime_funcs
+   * @since 2.0.0
+   */
+  @Experimental
+  def window(
+      timeColumn: Column,
+      windowDuration: String,
+      slideDuration: String,
+      startTime: String): Column = {
+    withExpr {
+      TimeWindow(timeColumn.expr, windowDuration, slideDuration, startTime)
+    }.as("window")
+  }
+
+
+  /**
+   * Bucketize rows into one or more time windows given a timestamp specifying 
column. Window
+   * starts are inclusive but the window ends are exclusive, e.g. 12:05 will 
be in the window
+   * [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond 
precision. Windows in
+   * the order of months are not supported. The windows start beginning at 
1970-01-01 00:00:00 UTC.
+   * The following example takes the average stock price for a one minute 
window every 10 seconds:
+   *
+   * {{{
+   *   val df = ... // schema => timestamp: TimestampType, stockId: 
StringType, price: DoubleType
+   *   df.groupBy(window($"time", "1 minute", "10 seconds"), $"stockId")
+   *     .agg(mean("price"))
+   * }}}
+   *
+   * The windows will look like:
+   *
+   * {{{
+   *   09:00:00-09:01:00
+   *   09:00:10-09:01:10
+   *   09:00:20-09:01:20 ...
+   * }}}
+   *
+   * For a continuous query, you may use the function `current_timestamp` to 
generate windows on
+   * processing time.
+   *
+   * @param timeColumn The column or the expression to use as the timestamp 
for windowing by time.
+   *                   The time can be as TimestampType or LongType, however 
when using LongType,
+   *                   the time must be given in seconds.
+   * @param windowDuration A string specifying the width of the window, e.g. 
`10 minutes`,
+   *                       `1 second`. Check 
[[org.apache.spark.unsafe.types.CalendarInterval]] for
+   *                       valid duration identifiers.
+   * @param slideDuration A string specifying the sliding interval of the 
window, e.g. `1 minute`.
+   *                      A new window will be generated every 
`slideDuration`. Must be less than
+   *                      or equal to the `windowDuration`. Check
+   *                      [[org.apache.spark.unsafe.types.CalendarInterval]] 
for valid duration.
+   *
+   * @group datetime_funcs
+   * @since 2.0.0
+   */
+  @Experimental
+  def window(timeColumn: Column, windowDuration: String, slideDuration: 
String): Column = {
+    window(timeColumn, windowDuration, slideDuration, "0 second")
+  }
+
+  /**
+   * Generates tumbling time windows given a timestamp specifying column. 
Window
+   * starts are inclusive but the window ends are exclusive, e.g. 12:05 will 
be in the window
+   * [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond 
precision. Windows in
+   * the order of months are not supported. The windows start beginning at 
1970-01-01 00:00:00 UTC.
+   * The following example takes the average stock price for a one minute 
tumbling window:
+   *
+   * {{{
+   *   val df = ... // schema => timestamp: TimestampType, stockId: 
StringType, price: DoubleType
+   *   df.groupBy(window($"time", "1 minute"), $"stockId")
+   *     .agg(mean("price"))
+   * }}}
+   *
+   * The windows will look like:
+   *
+   * {{{
+   *   09:00:00-09:01:00
+   *   09:01:00-09:02:00
+   *   09:02:00-09:03:00 ...
+   * }}}
+   *
+   * For a continuous query, you may use the function `current_timestamp` to 
generate windows on
+   * processing time.
+   *
+   * @param timeColumn The column or the expression to use as the timestamp 
for windowing by time.
+   *                   The time can be as TimestampType or LongType, however 
when using LongType,
+   *                   the time must be given in seconds.
+   * @param windowDuration A string specifying the width of the window, e.g. 
`10 minutes`,
+   *                       `1 second`. Check 
[[org.apache.spark.unsafe.types.CalendarInterval]] for
+   *                       valid duration identifiers.
+   *
+   * @group datetime_funcs
+   * @since 2.0.0
+   */
+  @Experimental
+  def window(timeColumn: Column, windowDuration: String): Column = {
+    window(timeColumn, windowDuration, windowDuration, "0 second")
+  }
+
   
//////////////////////////////////////////////////////////////////////////////////////////////
   // Collection functions
   
//////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/1b829ce1/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
new file mode 100644
index 0000000..e8103a3
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.TimeZone
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StringType
+
+class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  import testImplicits._
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+  }
+
+  override def afterEach(): Unit = {
+    super.beforeEach()
+    TimeZone.setDefault(null)
+  }
+
+  test("tumbling window groupBy statement") {
+    val df = Seq(
+      ("2016-03-27 19:39:34", 1, "a"),
+      ("2016-03-27 19:39:56", 2, "a"),
+      ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+    checkAnswer(
+      df.groupBy(window($"time", "10 seconds"))
+        .agg(count("*").as("counts"))
+        .orderBy($"window.start".asc)
+        .select("counts"),
+      Seq(Row(1), Row(1), Row(1))
+    )
+  }
+
+  test("tumbling window groupBy statement with startTime") {
+    val df = Seq(
+      ("2016-03-27 19:39:34", 1, "a"),
+      ("2016-03-27 19:39:56", 2, "a"),
+      ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+    checkAnswer(
+      df.groupBy(window($"time", "10 seconds", "10 seconds", "5 seconds"), 
$"id")
+        .agg(count("*").as("counts"))
+        .orderBy($"window.start".asc)
+        .select("counts"),
+      Seq(Row(1), Row(1), Row(1)))
+  }
+
+  test("tumbling window with multi-column projection") {
+    val df = Seq(
+      ("2016-03-27 19:39:34", 1, "a"),
+      ("2016-03-27 19:39:56", 2, "a"),
+      ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds"), $"value")
+        .orderBy($"window.start".asc)
+        .select($"window.start".cast("string"), $"window.end".cast("string"), 
$"value"),
+      Seq(
+        Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
+        Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2)
+      )
+    )
+  }
+
+  test("sliding window grouping") {
+    val df = Seq(
+      ("2016-03-27 19:39:34", 1, "a"),
+      ("2016-03-27 19:39:56", 2, "a"),
+      ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+    checkAnswer(
+      df.groupBy(window($"time", "10 seconds", "3 seconds", "0 second"))
+        .agg(count("*").as("counts"))
+        .orderBy($"window.start".asc)
+        .select($"window.start".cast("string"), $"window.end".cast("string"), 
$"counts"),
+      // 2016-03-27 19:39:27 UTC -> 4 bins
+      // 2016-03-27 19:39:34 UTC -> 3 bins
+      // 2016-03-27 19:39:56 UTC -> 3 bins
+      Seq(
+        Row("2016-03-27 19:39:18", "2016-03-27 19:39:28", 1),
+        Row("2016-03-27 19:39:21", "2016-03-27 19:39:31", 1),
+        Row("2016-03-27 19:39:24", "2016-03-27 19:39:34", 1),
+        Row("2016-03-27 19:39:27", "2016-03-27 19:39:37", 2),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
+        Row("2016-03-27 19:39:33", "2016-03-27 19:39:43", 1),
+        Row("2016-03-27 19:39:48", "2016-03-27 19:39:58", 1),
+        Row("2016-03-27 19:39:51", "2016-03-27 19:40:01", 1),
+        Row("2016-03-27 19:39:54", "2016-03-27 19:40:04", 1))
+    )
+  }
+
+  test("sliding window projection") {
+    val df = Seq(
+      ("2016-03-27 19:39:34", 1, "a"),
+      ("2016-03-27 19:39:56", 2, "a"),
+      ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds", "3 seconds", "0 second"), 
$"value")
+        .orderBy($"window.start".asc, $"value".desc).select("value"),
+      // 2016-03-27 19:39:27 UTC -> 4 bins
+      // 2016-03-27 19:39:34 UTC -> 3 bins
+      // 2016-03-27 19:39:56 UTC -> 3 bins
+      Seq(Row(4), Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), 
Row(2), Row(2))
+    )
+  }
+
+  test("windowing combined with explode expression") {
+    val df = Seq(
+      ("2016-03-27 19:39:34", 1, Seq("a", "b")),
+      ("2016-03-27 19:39:56", 2, Seq("a", "c", "d"))).toDF("time", "value", 
"ids")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds"), $"value", explode($"ids"))
+        .orderBy($"window.start".asc).select("value"),
+      // first window exploded to two rows for "a", and "b", second window 
exploded to 3 rows
+      Seq(Row(1), Row(1), Row(2), Row(2), Row(2))
+    )
+  }
+
+  test("null timestamps") {
+    val df = Seq(
+      ("2016-03-27 09:00:05", 1),
+      ("2016-03-27 09:00:32", 2),
+      (null, 3),
+      (null, 4)).toDF("time", "value")
+
+    checkDataset(
+      df.select(window($"time", "10 seconds"), $"value")
+        .orderBy($"window.start".asc)
+        .select("value")
+        .as[Int],
+      1, 2) // null columns are dropped
+  }
+
+  test("time window joins") {
+    val df = Seq(
+      ("2016-03-27 09:00:05", 1),
+      ("2016-03-27 09:00:32", 2),
+      (null, 3),
+      (null, 4)).toDF("time", "value")
+
+    val df2 = Seq(
+      ("2016-03-27 09:00:02", 3),
+      ("2016-03-27 09:00:35", 6)).toDF("time", "othervalue")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds"), $"value").join(
+        df2.select(window($"time", "10 seconds"), $"othervalue"), 
Seq("window"))
+        .groupBy("window")
+        .agg((sum("value") + sum("othervalue")).as("total"))
+        .orderBy($"window.start".asc).select("total"),
+      Seq(Row(4), Row(8)))
+  }
+
+  test("negative timestamps") {
+    val df4 = Seq(
+      ("1970-01-01 00:00:02", 1),
+      ("1970-01-01 00:00:12", 2)).toDF("time", "value")
+    checkAnswer(
+      df4.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), 
$"value")
+        .orderBy($"window.start".asc)
+        .select($"window.start".cast(StringType), 
$"window.end".cast(StringType), $"value"),
+      Seq(
+        Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
+        Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
+    )
+  }
+
+  test("multiple time windows in a single operator throws nice exception") {
+    val df = Seq(
+      ("2016-03-27 09:00:02", 3),
+      ("2016-03-27 09:00:35", 6)).toDF("time", "value")
+    val e = intercept[AnalysisException] {
+      df.select(window($"time", "10 second"), window($"time", "15 
second")).collect()
+    }
+    assert(e.getMessage.contains(
+      "Multiple time window expressions would result in a cartesian product"))
+  }
+
+  test("aliased windows") {
+    val df = Seq(
+      ("2016-03-27 19:39:34", 1, Seq("a", "b")),
+      ("2016-03-27 19:39:56", 2, Seq("a", "c", "d"))).toDF("time", "value", 
"ids")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("time_window"), $"value")
+        .orderBy($"time_window.start".asc)
+        .select("value"),
+      Seq(Row(1), Row(2))
+    )
+  }
+
+  test("millisecond precision sliding windows") {
+    val df = Seq(
+      ("2016-03-27 09:00:00.41", 3),
+      ("2016-03-27 09:00:00.62", 6),
+      ("2016-03-27 09:00:00.715", 8)).toDF("time", "value")
+    checkAnswer(
+      df.groupBy(window($"time", "200 milliseconds", "40 milliseconds", "0 
milliseconds"))
+        .agg(count("*").as("counts"))
+        .orderBy($"window.start".asc)
+        .select($"window.start".cast(StringType), 
$"window.end".cast(StringType), $"counts"),
+      Seq(
+        Row("2016-03-27 09:00:00.24", "2016-03-27 09:00:00.44", 1),
+        Row("2016-03-27 09:00:00.28", "2016-03-27 09:00:00.48", 1),
+        Row("2016-03-27 09:00:00.32", "2016-03-27 09:00:00.52", 1),
+        Row("2016-03-27 09:00:00.36", "2016-03-27 09:00:00.56", 1),
+        Row("2016-03-27 09:00:00.4", "2016-03-27 09:00:00.6", 1),
+        Row("2016-03-27 09:00:00.44", "2016-03-27 09:00:00.64", 1),
+        Row("2016-03-27 09:00:00.48", "2016-03-27 09:00:00.68", 1),
+        Row("2016-03-27 09:00:00.52", "2016-03-27 09:00:00.72", 2),
+        Row("2016-03-27 09:00:00.56", "2016-03-27 09:00:00.76", 2),
+        Row("2016-03-27 09:00:00.6", "2016-03-27 09:00:00.8", 2),
+        Row("2016-03-27 09:00:00.64", "2016-03-27 09:00:00.84", 1),
+        Row("2016-03-27 09:00:00.68", "2016-03-27 09:00:00.88", 1))
+    )
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to