[ https://issues.apache.org/jira/browse/FLINK-10676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679285#comment-16679285 ]
ASF GitHub Bot commented on FLINK-10676: ---------------------------------------- asfgit closed pull request #6949: [FLINK-10676][table] Add 'as' method for OverWindowWithOrderBy URL: https://github.com/apache/flink/pull/6949 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 76bd5b28ef2..ed3a97d8def 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -1571,13 +1571,15 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov </tr> <tr> <td><code>preceding</code></td> - <td>Required</td> + <td>Optional</td> <td> <p>Defines the interval of rows that are included in the window and precede the current row. The interval can either be specified as time or row-count interval.</p> <p><a href="tableApi.html#bounded-over-windows">Bounded over windows</a> are specified with the size of the interval, e.g., <code>10.minutes</code> for a time interval or <code>10.rows</code> for a row-count interval.</p> <p><a href="tableApi.html#unbounded-over-windows">Unbounded over windows</a> are specified using a constant, i.e., <code>UNBOUNDED_RANGE</code> for a time interval or <code>UNBOUNDED_ROW</code> for a row-count interval. Unbounded over windows start with the first row of a partition.</p> + + <p>If the <code>preceding</code> clause is omitted, <code>UNBOUNDED_RANGE</code> and <code>CURRENT_RANGE</code> are used as the default <code>preceding</code> and <code>following</code> for the window.</p> </td> </tr> <tr> diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala index f326f6f6a7c..121aab8f776 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala @@ -18,7 +18,8 @@ package org.apache.flink.table.api.java -import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} +import org.apache.flink.table.api.scala.{CURRENT_RANGE, UNBOUNDED_RANGE} +import org.apache.flink.table.api.{OverWindow, TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} import org.apache.flink.table.expressions.{Expression, ExpressionParser} /** @@ -144,4 +145,21 @@ class OverWindowWithOrderBy( new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr) } + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return over window + */ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return over window + */ + def as(alias: Expression): OverWindow = { + OverWindow(alias, partitionByExpr, orderByExpr, UNBOUNDED_RANGE, CURRENT_RANGE) + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala index 91bf1a6c739..2f88248a7f1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala @@ -18,8 +18,8 @@ package org.apache.flink.table.api.scala -import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} -import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.api.{OverWindow, TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} +import org.apache.flink.table.expressions.{Expression, ExpressionParser} /** * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping @@ -127,7 +127,6 @@ case class PartitionedOver(partitionBy: Array[Expression]) { case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: Expression) { - /** * Set the preceding offset (based on time or row-count intervals) for over window. * @@ -138,4 +137,21 @@ case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: Expressi new OverWindowWithPreceding(partitionBy, orderBy, preceding) } + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return over window + */ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return over window + */ + def as(alias: Expression): OverWindow = { + OverWindow(alias, partitionBy, orderBy, UNBOUNDED_RANGE, CURRENT_RANGE) + } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala index 1be5810c36c..eca53c055f7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala @@ -287,7 +287,15 @@ class OverWindowTest extends TableTestBase { "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "FROM MyTable " + "WINDOW w AS (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)" + + val sql3 = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY proctime) as cnt1, " + + "sum(a) OVER (PARTITION BY c ORDER BY proctime) as cnt2 " + + "from MyTable" + streamUtil.verifySqlPlansIdentical(sql, sql2) + streamUtil.verifySqlPlansIdentical(sql, sql3) val expected = unaryNode( @@ -523,6 +531,13 @@ class OverWindowTest extends TableTestBase { "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " + "from MyTable" + val sql1 = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY rowtime) as cnt1, " + + "sum(a) OVER (PARTITION BY c ORDER BY rowtime) as cnt2 " + + "from MyTable" + streamUtil.verifySqlPlansIdentical(sql, sql1) + val expected = unaryNode( "DataStreamCalc", diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala index 55e3ecbac0d..eeb7d5f3d68 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala @@ -194,10 +194,15 @@ class OverWindowTest extends TableTestBase { val weightedAvg = new WeightedAvgWithRetract val result = table - .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE following - CURRENT_RANGE as 'w) + .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) + .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w) + + val result2 = table + .window(Over partitionBy 'c orderBy 'proctime as 'w) .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w) + streamUtil.verify2Tables(result, result2) + val expected = unaryNode( "DataStreamCalc", @@ -459,6 +464,12 @@ class OverWindowTest extends TableTestBase { CURRENT_RANGE as 'w) .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg) + val result2 = table + .window(Over partitionBy 'c orderBy 'rowtime as 'w) + .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg) + + streamUtil.verify2Tables(result, result2) + val expected = unaryNode( "DataStreamCalc", diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala index 3e757dabbba..99114b88318 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala @@ -120,7 +120,7 @@ class OverWindowStringExpressionTest extends TableTestBase { } @Test - def testUnboundedOverRange(): Unit = { + def testRowTimeUnboundedOverRange(): Unit = { val util = streamTestUtil() val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) @@ -134,8 +134,37 @@ class OverWindowStringExpressionTest extends TableTestBase { .window( JOver.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") + val resJava2 = t + .window( + JOver.orderBy("rowtime").as("w")) + .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") + + verifyTableEquals(resScala, resJava) + verifyTableEquals(resScala, resJava2) + } + + @Test + def testProcTimeUnboundedOverRange(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'proctime.proctime) + + val weightAvgFun = new WeightedAvg + util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) + + val resScala = t + .window(SOver orderBy 'proctime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) + .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) + val resJava = t + .window( + JOver.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w")) + .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") + val resJava2 = t + .window( + JOver.orderBy("proctime").as("w")) + .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) + verifyTableEquals(resScala, resJava2) } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add 'as' method for OverWindowWithOrderBy in Java API > ----------------------------------------------------- > > Key: FLINK-10676 > URL: https://issues.apache.org/jira/browse/FLINK-10676 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Affects Versions: 1.7.0 > Reporter: sunjincheng > Assignee: Hequn Cheng > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The preceding clause of OVER Window in the traditional database is optional. > The default is UNBOUNDED. So we can add the "as" method to > OverWindowWithOrderBy. This way OVERWindow is written more easily. e.g.: > {code:java} > .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as > 'w){code} > Can be simplified as follows: > {code:java} > .window(Over partitionBy 'c orderBy 'proctime as 'w){code} > What do you think? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)