[
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15978485#comment-15978485
]
ASF GitHub Bot commented on FLINK-6228:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3743#discussion_r112654550
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
---
@@ -49,6 +59,160 @@ case class Call(functionName: String, args:
Seq[Expression]) extends Expression
}
/**
+ * Over expression for calcite over transform.
+ *
+ * @param agg over-agg expression
+ * @param overWindowAlias over window alias
+ * @param overWindow over window
+ */
+case class OverCall(
+ agg: Aggregation,
+ overWindowAlias: Expression,
+ var overWindow: OverWindow = null) extends Expression {
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder):
RexNode = {
+
+ val rexBuilder = relBuilder.getRexBuilder
+
+ val operator: SqlAggFunction = agg.getSqlAggFunction()
+
+ val relDataType = relBuilder
+ .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(agg.resultType)
+
+ val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+ val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+ aggExprs.add(relBuilder.field(aggChildName))
+
+ val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+ new ImmutableList.Builder[RexFieldCollation]()
+
+ val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+ val orderName =
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+ val rexNode =
+ if (orderName.equalsIgnoreCase("rowtime")) {
+ // for stream event-time
+ relBuilder.call(EventTimeExtractor)
+ }
+ else if (orderName.equalsIgnoreCase("proctime")) {
+ // for stream proc-time
+ relBuilder.call(ProcTimeExtractor)
+ } else {
+ // for batch event-time
+ relBuilder.field(orderName)
+ }
+
+ orderKeys.add(new RexFieldCollation(rexNode, sets))
+
+ val partitionKeys: util.ArrayList[RexNode] = new
util.ArrayList[RexNode]()
+ overWindow.partitionBy.foreach {
+ x =>
+ val partitionKey =
relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
+ if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) {
+ throw ValidationException(
+ s"expression $partitionKey cannot be used as a partition key
expression " +
+ "because it's not a valid key type which must be hashable and
comparable")
+ }
+ partitionKeys.add(partitionKey)
+ }
+
+ val preceding = overWindow.preceding.asInstanceOf[Literal]
+ val following = overWindow.following.asInstanceOf[Literal]
+
+ val isPhysical: Boolean =
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+ val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
+ val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
+
+ rexBuilder.makeOver(
+ relDataType,
+ operator,
+ aggExprs,
+ partitionKeys,
+ orderKeys.build,
+ lowerBound,
+ upperBound,
+ isPhysical,
+ true,
+ false)
+ }
+
+ private def createBound(
+ relBuilder: RelBuilder,
+ bound: Literal,
+ sqlKind: SqlKind): RexWindowBound = {
+
+ if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW) {
+ val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
+ create(unbounded, null)
+ } else if (bound == CURRENT_RANGE || bound == CURRENT_ROW) {
+ val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
+ create(currentRow, null)
+ } else {
+ val returnType = relBuilder
+ .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(Types.DECIMAL)
+
+ val sqlOperator = new SqlPostfixOperator(
+ sqlKind.name,
+ sqlKind,
+ 2,
+ new OrdinalReturnTypeInference(0),
+ null,
+ null)
+
+ val operands: Array[SqlNode] = new Array[SqlNode](1)
+ operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
+
+ val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
+
+ val expressions: util.ArrayList[RexNode] = new
util.ArrayList[RexNode]()
+ expressions.add(relBuilder.literal(bound.value))
+
+ val rexNode = relBuilder.getRexBuilder.makeCall(returnType,
sqlOperator, expressions)
+
+ create(node, rexNode)
+ }
+ }
+
+ override private[flink] def children: Seq[Expression] = Seq(agg)
+
+ override def toString =
s"${this.getClass.getCanonicalName}(${overWindowAlias.toString})"
+
+ override private[flink] def resultType = agg.resultType
+
+ override private[flink] def validateInput(): ValidationResult = {
+ var validationResult: ValidationResult = ValidationSuccess
+ val orderName =
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+ if (!orderName.equalsIgnoreCase("rowtime")
+ && !orderName.equalsIgnoreCase("proctime")) {
+ ValidationFailure(
+ s"OrderBy expression must be ['rowtime] or ['proctime], but got
['${orderName}]")
+ }
+ if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass
--- End diff --
We need to check if `preceding` and `following` are of type `Literal` first.
> Integrating the OVER windows in the Table API
> ---------------------------------------------
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Syntax:
> {code}
> table
> .overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy
> order_by_expression]
> (preceding
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
> [following
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
> )
> .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single
> threaded execution).
> * The ORDER BY Before the
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for
> batch).
> * FOLLOWING is not supported.
> User interface design document [See |
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)