Repository: spark
Updated Branches:
  refs/heads/master e5d0928e2 -> 9053054c7


[SPARK-16195][SQL] Allow users to specify empty over clause in window 
expressions through dataset API

## What changes were proposed in this pull request?
Allow to specify empty over clause in window expressions through dataset API

In SQL, its allowed to specify an empty OVER clause in the window expression.

```SQL
select area, sum(product) over () as c from windowData
where product > 3 group by area, product
having avg(month) > 0 order by avg(month), product
```
In this case the analytic function sum is presented based on all the rows of 
the result set

Currently its not allowed through dataset API and is handled in this PR.

## How was this patch tested?

Added a new test in DataframeWindowSuite

Author: Dilip Biswal <dbis...@us.ibm.com>

Closes #13897 from dilipbiswal/spark-empty-over.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9053054c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9053054c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9053054c

Branch: refs/heads/master
Commit: 9053054c7f5ec2b9e3d8efbe6bfbfa68a6d1f0d0
Parents: e5d0928
Author: Dilip Biswal <dbis...@us.ibm.com>
Authored: Fri Jun 24 17:27:33 2016 -0700
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Fri Jun 24 17:27:33 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/Column.scala   | 17 +++++++++++++++++
 .../org/apache/spark/sql/expressions/Window.scala  |  2 +-
 .../apache/spark/sql/DataFrameWindowSuite.scala    | 12 ++++++++++++
 3 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9053054c/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 713f794..9f35107 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.types._
 
@@ -1094,6 +1095,22 @@ class Column(protected[sql] val expr: Expression) 
extends Logging {
    */
   def over(window: expressions.WindowSpec): Column = window.withAggregate(this)
 
+  /**
+   * Define a empty analytic clause. In this case the analytic function is 
applied
+   * and presented for all rows in the result set.
+   *
+   * {{{
+   *   df.select(
+   *     sum("price").over(),
+   *     avg("price").over()
+   *   )
+   * }}}
+   *
+   * @group expr_ops
+   * @since 2.0.0
+   */
+  def over(): Column = over(Window.spec)
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9053054c/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
index 350c283..c29ec6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
@@ -74,7 +74,7 @@ object Window {
     spec.orderBy(cols : _*)
   }
 
-  private def spec: WindowSpec = {
+  private[sql] def spec: WindowSpec = {
     new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9053054c/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
index 9a1aa46..c6f8c3a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
@@ -245,6 +245,18 @@ class DataFrameWindowSuite extends QueryTest with 
SharedSQLContext {
       Seq(Row("a", 6, 9), Row("b", 9, 6)))
   }
 
+  test("SPARK-16195 empty over spec") {
+    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("b", 2)).
+      toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select($"key", $"value", sum($"value").over(), avg($"value").over()),
+      Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), 
Row("b", 2, 6, 1.5)))
+    checkAnswer(
+      sql("select key, value, sum(value) over(), avg(value) over() from 
window_table"),
+      Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), 
Row("b", 2, 6, 1.5)))
+  }
+
   test("window function with udaf") {
     val udaf = new UserDefinedAggregateFunction {
       def inputSchema: StructType = new StructType()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to