[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15978485#comment-15978485
 ] 

ASF GitHub Bot commented on FLINK-6228:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112654550
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
    @@ -49,6 +59,160 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.getSqlAggFunction()
    +
    +    val relDataType = relBuilder
    +      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +      .createTypeFromTypeInfo(agg.resultType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime")) {
    +        // for stream event-time
    +        relBuilder.call(EventTimeExtractor)
    +      }
    +      else if (orderName.equalsIgnoreCase("proctime")) {
    +        // for stream proc-time
    +        relBuilder.call(ProcTimeExtractor)
    +      } else {
    +        // for batch event-time
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode, sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach {
    +      x =>
    +        val partitionKey = 
relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
    +        if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) {
    +          throw ValidationException(
    +            s"expression $partitionKey cannot be used as a partition key 
expression " +
    +            "because it's not a valid key type which must be hashable and 
comparable")
    +        }
    +        partitionKeys.add(partitionKey)
    +    }
    +
    +    val preceding = overWindow.preceding.asInstanceOf[Literal]
    +    val following = overWindow.following.asInstanceOf[Literal]
    +
    +    val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
    +
    +    val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
    +    val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
    +
    +    rexBuilder.makeOver(
    +      relDataType,
    +      operator,
    +      aggExprs,
    +      partitionKeys,
    +      orderKeys.build,
    +      lowerBound,
    +      upperBound,
    +      isPhysical,
    +      true,
    +      false)
    +  }
    +
    +  private def createBound(
    +    relBuilder: RelBuilder,
    +    bound: Literal,
    +    sqlKind: SqlKind): RexWindowBound = {
    +
    +    if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW) {
    +      val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
    +      create(unbounded, null)
    +    } else if (bound == CURRENT_RANGE || bound == CURRENT_ROW) {
    +      val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
    +      create(currentRow, null)
    +    } else {
    +      val returnType = relBuilder
    +        .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +        .createTypeFromTypeInfo(Types.DECIMAL)
    +
    +      val sqlOperator = new SqlPostfixOperator(
    +        sqlKind.name,
    +        sqlKind,
    +        2,
    +        new OrdinalReturnTypeInference(0),
    +        null,
    +        null)
    +
    +      val operands: Array[SqlNode] = new Array[SqlNode](1)
    +      operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
    +
    +      val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
    +
    +      val expressions: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
    +      expressions.add(relBuilder.literal(bound.value))
    +
    +      val rexNode = relBuilder.getRexBuilder.makeCall(returnType, 
sqlOperator, expressions)
    +
    +      create(node, rexNode)
    +    }
    +  }
    +
    +  override private[flink] def children: Seq[Expression] = Seq(agg)
    +
    +  override def toString = 
s"${this.getClass.getCanonicalName}(${overWindowAlias.toString})"
    +
    +  override private[flink] def resultType = agg.resultType
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    var validationResult: ValidationResult = ValidationSuccess
    +    val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +    if (!orderName.equalsIgnoreCase("rowtime")
    +      && !orderName.equalsIgnoreCase("proctime")) {
    +      ValidationFailure(
    +        s"OrderBy expression must be ['rowtime] or ['proctime], but got 
['${orderName}]")
    +    }
    +    if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass
    --- End diff --
    
    We need to check if `preceding` and `following` are of type `Literal` first.


> Integrating the OVER windows in the Table API
> ---------------------------------------------
>
>                 Key: FLINK-6228
>                 URL: https://issues.apache.org/jira/browse/FLINK-6228
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>    .overWindows(
>     (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>       (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>      [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
>     as alias,...[n])
>    )
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to