Repository: flink Updated Branches: refs/heads/release-1.3 5281dd659 -> 88fbe2ac8
[FLINK-6817] [table] Add OverWindowWithPreceding class to guide users apply preceding in over window Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88fbe2ac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88fbe2ac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88fbe2ac Branch: refs/heads/release-1.3 Commit: 88fbe2ac83f08074467c33b48e981b7ba9d1032f Parents: 5281dd6 Author: Jark Wu <[email protected]> Authored: Fri Jun 2 21:28:45 2017 +0800 Committer: Jark Wu <[email protected]> Committed: Wed Jun 14 17:42:56 2017 +0800 ---------------------------------------------------------------------- .../apache/flink/table/api/java/windows.scala | 22 ++++++++++++-- .../apache/flink/table/api/scala/windows.scala | 19 ++++++++++-- .../org/apache/flink/table/api/windows.scala | 31 ++++---------------- 3 files changed, 42 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/88fbe2ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala index 15208ce..f326f6f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.api.java -import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, SlideWithSize, SessionWithGap} +import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} import org.apache.flink.table.expressions.{Expression, ExpressionParser} /** @@ -98,7 +98,7 @@ object Over { */ def orderBy(orderBy: String): OverWindowWithOrderBy = { val orderByExpr = ExpressionParser.parseExpression(orderBy) - new OverWindowWithOrderBy(Seq[Expression](), orderByExpr) + new OverWindowWithOrderBy(Array[Expression](), orderByExpr) } /** @@ -127,3 +127,21 @@ class PartitionedOver(private val partitionByExpr: Array[Expression]) { new OverWindowWithOrderBy(partitionByExpr, orderByExpr) } } + + +class OverWindowWithOrderBy( + private val partitionByExpr: Array[Expression], + private val orderByExpr: Expression) { + + /** + * Set the preceding offset (based on time or row-count intervals) for over window. + * + * @param preceding preceding offset relative to the current row. + * @return this over window + */ + def preceding(preceding: String): OverWindowWithPreceding = { + val precedingExpr = ExpressionParser.parseExpression(preceding) + new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/88fbe2ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala index d0430c2..91bf1a6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.api.scala -import org.apache.flink.table.api.{OverWindowWithOrderBy, SessionWithGap, SlideWithSize, TumbleWithSize} +import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} import org.apache.flink.table.expressions.Expression /** @@ -121,6 +121,21 @@ case class PartitionedOver(partitionBy: Array[Expression]) { * For batch tables, refer to a timestamp or long attribute. */ def orderBy(orderBy: Expression): OverWindowWithOrderBy = { - new OverWindowWithOrderBy(partitionBy, orderBy) + OverWindowWithOrderBy(partitionBy, orderBy) } } + +case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: Expression) { + + + /** + * Set the preceding offset (based on time or row-count intervals) for over window. + * + * @param preceding preceding offset relative to the current row. + * @return this over window + */ + def preceding(preceding: Expression): OverWindowWithPreceding = { + new OverWindowWithPreceding(partitionBy, orderBy, preceding) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/88fbe2ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala index 11ef360..ee022b1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala @@ -68,11 +68,11 @@ case class UnboundedRange() extends Expression { /** * A partially defined over window. */ -class OverWindowWithOrderBy( +class OverWindowWithPreceding( private val partitionBy: Seq[Expression], - private val orderBy: Expression) { + private val orderBy: Expression, + private val preceding: Expression) { - private[flink] var preceding: Expression = _ private[flink] var following: Expression = _ /** @@ -103,33 +103,12 @@ class OverWindowWithOrderBy( } /** - * Set the preceding offset (based on time or row-count intervals) for over window. - * - * @param preceding preceding offset relative to the current row. - * @return this over window - */ - def preceding(preceding: String): OverWindowWithOrderBy = { - this.preceding(ExpressionParser.parseExpression(preceding)) - } - - /** - * Set the preceding offset (based on time or row-count intervals) for over window. - * - * @param preceding preceding offset relative to the current row. - * @return this over window - */ - def preceding(preceding: Expression): OverWindowWithOrderBy = { - this.preceding = preceding - this - } - - /** * Set the following offset (based on time or row-count intervals) for over window. * * @param following following offset that relative to the current row. * @return this over window */ - def following(following: String): OverWindowWithOrderBy = { + def following(following: String): OverWindowWithPreceding = { this.following(ExpressionParser.parseExpression(following)) } @@ -139,7 +118,7 @@ class OverWindowWithOrderBy( * @param following following offset that relative to the current row. * @return this over window */ - def following(following: Expression): OverWindowWithOrderBy = { + def following(following: Expression): OverWindowWithPreceding = { this.following = following this }
