[ 
https://issues.apache.org/jira/browse/FLINK-5884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15997622#comment-15997622
 ] 

ASF GitHub Bot commented on FLINK-5884:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3808#discussion_r114814140
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 ---
    @@ -18,259 +18,165 @@
     
     package org.apache.flink.table.plan.logical
     
    -import org.apache.flink.api.common.typeinfo.BasicTypeInfo
     import org.apache.flink.table.api.{BatchTableEnvironment, 
StreamTableEnvironment, TableEnvironment}
    +import 
org.apache.flink.table.expressions.ExpressionUtils.{isRowCountLiteral, 
isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}
     import org.apache.flink.table.expressions._
    -import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo, TypeCoercion}
    +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint
     import org.apache.flink.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
     
    -abstract class EventTimeGroupWindow(
    -    alias: Expression,
    -    time: Expression)
    -  extends LogicalWindow(alias) {
    -
    -  override def validate(tableEnv: TableEnvironment): ValidationResult = {
    -    val valid = super.validate(tableEnv)
    -    if (valid.isFailure) {
    -        return valid
    -    }
    -
    -    tableEnv match {
    -      case _: StreamTableEnvironment =>
    -        time match {
    -          case RowtimeAttribute() =>
    -            ValidationSuccess
    -          case _ =>
    -            ValidationFailure("Event-time window expects a 'rowtime' time 
field.")
    -      }
    -      case _: BatchTableEnvironment =>
    -        if (!TypeCoercion.canCast(time.resultType, 
BasicTypeInfo.LONG_TYPE_INFO)) {
    -          ValidationFailure(s"Event-time window expects a time field that 
can be safely cast " +
    -            s"to Long, but is ${time.resultType}")
    -        } else {
    -          ValidationSuccess
    -        }
    -    }
    -
    -  }
    -}
    -
    -abstract class ProcessingTimeGroupWindow(alias: Expression) extends 
LogicalWindow(alias) {
    -  override def validate(tableEnv: TableEnvironment): ValidationResult = {
    -    val valid = super.validate(tableEnv)
    -    if (valid.isFailure) {
    -      return valid
    -    }
    -
    -    tableEnv match {
    -      case b: BatchTableEnvironment => ValidationFailure(
    -        "Window on batch must declare a time attribute over which the 
query is evaluated.")
    -      case _ =>
    -        ValidationSuccess
    -    }
    -  }
    -}
    -
     // 
------------------------------------------------------------------------------------------------
     // Tumbling group windows
     // 
------------------------------------------------------------------------------------------------
     
    -object TumblingGroupWindow {
    -  def validate(tableEnv: TableEnvironment, size: Expression): 
ValidationResult = size match {
    -    case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
    -      ValidationSuccess
    -    case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
    -      ValidationSuccess
    -    case _ =>
    -      ValidationFailure("Tumbling window expects size literal of type 
Interval of Milliseconds " +
    -        "or Interval of Rows.")
    -  }
    -}
    -
    -case class ProcessingTimeTumblingGroupWindow(
    -    override val alias: Expression,
    -    size: Expression)
    -  extends ProcessingTimeGroupWindow(alias) {
    -
    -  override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
    -    ProcessingTimeTumblingGroupWindow(
    -      resolve(alias),
    -      resolve(size))
    -
    -  override def validate(tableEnv: TableEnvironment): ValidationResult =
    -    super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, 
size))
    -
    -  override def toString: String = 
s"ProcessingTimeTumblingGroupWindow($alias, $size)"
    -}
    -
    -case class EventTimeTumblingGroupWindow(
    -    override val alias: Expression,
    +case class TumblingGroupWindow(
    +    alias: Expression,
         timeField: Expression,
         size: Expression)
    -  extends EventTimeGroupWindow(
    +  extends LogicalWindow(
         alias,
         timeField) {
     
       override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
    -    EventTimeTumblingGroupWindow(
    +    TumblingGroupWindow(
           resolve(alias),
           resolve(timeField),
           resolve(size))
     
       override def validate(tableEnv: TableEnvironment): ValidationResult =
    -    super.validate(tableEnv)
    -      .orElse(TumblingGroupWindow.validate(tableEnv, size))
    -      .orElse(size match {
    -        case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS)
    -          if tableEnv.isInstanceOf[StreamTableEnvironment] =>
    +    super.validate(tableEnv).orElse(
    +      tableEnv match {
    +
    +        // check size
    +        case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) 
=>
    +          ValidationFailure(
    +            "Tumbling window expects size literal of type Interval of 
Milliseconds " +
    +              "or Interval of Rows.")
    +
    +        // check time attribute
    +        case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
    +          ValidationFailure(
    +            "Tumbling window expects a time attribute for grouping in a 
stream environment.")
    +        case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
    --- End diff --
    
    Should this be something like  `!(isTimePoint(timeField.resultType) || 
isLong(timeField.resultType))`?
    
    same for the other windows?


> Integrate time indicators for Table API & SQL
> ---------------------------------------------
>
>                 Key: FLINK-5884
>                 URL: https://issues.apache.org/jira/browse/FLINK-5884
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> We already discussed the need for a proper integration of time indicators 
> (event-time or processing-time) for both the Table API & SQL on the ML:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-SQL-indicators-for-event-and-processing-time-tp15927.html
> This issue will track the progress. I will work on a design document how we 
> can solve this issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to