twalthr commented on a change in pull request #7920: [FLINK-11844][table-api]
Simplify over window API classes and improve documentation
URL: https://github.com/apache/flink/pull/7920#discussion_r264118237
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
##########
@@ -20,90 +20,180 @@ package org.apache.flink.table.api
import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo,
TimeIntervalTypeInfo}
-import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW}
/**
- * Over window is similar to the traditional OVER SQL.
+ * A over-window specification.
+ *
+ * Similar to SQL, over window aggregates compute an aggregate for each input
row over a range
+ * of its neighboring rows.
+ */
+class OverWindow(
+ alias: Expression,
+ partitionBy: Seq[Expression],
+ orderBy: Expression,
+ preceding: Expression,
+ following: Option[Expression]) {
+
+ def getAlias: Expression = alias
+
+ def getPartitioning: Seq[Expression] = partitionBy
+
+ def getOrder: Expression = orderBy
+
+ def getPreceding: Expression = preceding
+
+ def getFollowing: Option[Expression] = following
+}
+
+//
------------------------------------------------------------------------------------------------
+// Over windows
+//
------------------------------------------------------------------------------------------------
+
+/**
+ * Partially defined over window with partitioning.
+ */
+class OverWindowPartitioned(partitionBy: Seq[Expression]) {
+
+ /**
+ * Specifies the time attribute on which rows are ordered.
+ *
+ * For streaming tables, reference a rowtime or proctime time attribute here
+ * to specify the time mode.
+ *
+ * For batch tables, refer to a timestamp or long attribute.
+ *
+ * @param orderByExpr field reference
+ * @return an over window with defined order
+ */
+ def orderBy(orderByExpr: String): OverWindowPartitionedOrdered = {
+ orderBy(ExpressionParser.parseExpression(orderByExpr))
+ }
+
+ /**
+ * Specifies the time attribute on which rows are ordered.
+ *
+ * For streaming tables, reference a rowtime or proctime time attribute here
+ * to specify the time mode.
+ *
+ * For batch tables, refer to a timestamp or long attribute.
+ *
+ * @param orderBy field reference
+ * @return an over window with defined order
+ */
+ def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = {
+ new OverWindowPartitionedOrdered(partitionBy, orderBy)
+ }
+}
+
+/**
+ * Partially defined over window with (optional) partitioning and order.
*/
-case class OverWindow(
- private[flink] val alias: Expression,
- private[flink] val partitionBy: Seq[Expression],
- private[flink] val orderBy: Expression,
- private[flink] val preceding: Expression,
- private[flink] val following: Expression)
+class OverWindowPartitionedOrdered(partitionBy: Seq[Expression], orderBy:
Expression) {
+
+ /**
+ * Set the preceding offset (based on time or row-count intervals) for over
window.
+ *
+ * @param precedingExpr preceding offset relative to the current row.
+ * @return an over window with defined preceding
+ */
+ def preceding(precedingExpr: String): OverWindowPartitionedOrderedPreceding
= {
+ preceding(ExpressionParser.parseExpression(precedingExpr))
+ }
+
+ /**
+ * Set the preceding offset (based on time or row-count intervals) for over
window.
+ *
+ * @param preceding preceding offset relative to the current row.
+ * @return an over window with defined preceding
+ */
+ def preceding(preceding: Expression): OverWindowPartitionedOrderedPreceding
= {
+ new OverWindowPartitionedOrderedPreceding(partitionBy, orderBy, preceding)
+ }
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause
can refer to.
+ *
+ * @param alias alias for this over window
+ * @return the fully defined over window
+ */
+ def as(alias: String): OverWindow =
as(ExpressionParser.parseExpression(alias))
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause
can refer to.
+ *
+ * @param alias alias for this over window
+ * @return the fully defined over window
+ */
+ def as(alias: Expression): OverWindow = {
+ new OverWindow(alias, partitionBy, orderBy, UnboundedRange(), None)
+ }
+}
/**
- * A partially defined over window.
+ * Partially defined over window with (optional) partitioning, order, and
preceding.
*/
-class OverWindowWithPreceding(
+class OverWindowPartitionedOrderedPreceding(
private val partitionBy: Seq[Expression],
private val orderBy: Expression,
private val preceding: Expression) {
- private[flink] var following: Expression = _
+ private var optionalFollowing: Option[Expression] = None
/**
* Assigns an alias for this window that the following `select()` clause
can refer to.
*
* @param alias alias for this over window
- * @return over window
+ * @return the fully defined over window
*/
def as(alias: String): OverWindow =
as(ExpressionParser.parseExpression(alias))
/**
* Assigns an alias for this window that the following `select()` clause
can refer to.
*
* @param alias alias for this over window
- * @return over window
+ * @return the fully defined over window
*/
def as(alias: Expression): OverWindow = {
-
- // set following to CURRENT_ROW / CURRENT_RANGE if not defined
- if (null == following) {
- if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
- following = CURRENT_ROW
- } else {
- following = CURRENT_RANGE
- }
- }
- OverWindow(alias, partitionBy, orderBy, preceding, following)
+ new OverWindow(alias, partitionBy, orderBy, preceding, optionalFollowing)
}
/**
* Set the following offset (based on time or row-count intervals) for over
window.
*
- * @param following following offset that relative to the current row.
- * @return this over window
+ * @param followingExpr following offset that relative to the current row.
+ * @return an over window with defined following
*/
- def following(following: String): OverWindowWithPreceding = {
- this.following(ExpressionParser.parseExpression(following))
+ def following(followingExpr: String): OverWindowPartitionedOrderedPreceding
= {
Review comment:
+1, my IDE was just complaining about the name duplicate but I solved this
issue by using `this.` again.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services