[
https://issues.apache.org/jira/browse/FLINK-5884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15997622#comment-15997622
]
ASF GitHub Bot commented on FLINK-5884:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3808#discussion_r114814140
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
---
@@ -18,259 +18,165 @@
package org.apache.flink.table.plan.logical
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.table.api.{BatchTableEnvironment,
StreamTableEnvironment, TableEnvironment}
+import
org.apache.flink.table.expressions.ExpressionUtils.{isRowCountLiteral,
isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}
import org.apache.flink.table.expressions._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo,
TimeIntervalTypeInfo, TypeCoercion}
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint
import org.apache.flink.table.validate.{ValidationFailure,
ValidationResult, ValidationSuccess}
-abstract class EventTimeGroupWindow(
- alias: Expression,
- time: Expression)
- extends LogicalWindow(alias) {
-
- override def validate(tableEnv: TableEnvironment): ValidationResult = {
- val valid = super.validate(tableEnv)
- if (valid.isFailure) {
- return valid
- }
-
- tableEnv match {
- case _: StreamTableEnvironment =>
- time match {
- case RowtimeAttribute() =>
- ValidationSuccess
- case _ =>
- ValidationFailure("Event-time window expects a 'rowtime' time
field.")
- }
- case _: BatchTableEnvironment =>
- if (!TypeCoercion.canCast(time.resultType,
BasicTypeInfo.LONG_TYPE_INFO)) {
- ValidationFailure(s"Event-time window expects a time field that
can be safely cast " +
- s"to Long, but is ${time.resultType}")
- } else {
- ValidationSuccess
- }
- }
-
- }
-}
-
-abstract class ProcessingTimeGroupWindow(alias: Expression) extends
LogicalWindow(alias) {
- override def validate(tableEnv: TableEnvironment): ValidationResult = {
- val valid = super.validate(tableEnv)
- if (valid.isFailure) {
- return valid
- }
-
- tableEnv match {
- case b: BatchTableEnvironment => ValidationFailure(
- "Window on batch must declare a time attribute over which the
query is evaluated.")
- case _ =>
- ValidationSuccess
- }
- }
-}
-
//
------------------------------------------------------------------------------------------------
// Tumbling group windows
//
------------------------------------------------------------------------------------------------
-object TumblingGroupWindow {
- def validate(tableEnv: TableEnvironment, size: Expression):
ValidationResult = size match {
- case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- ValidationSuccess
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
- ValidationSuccess
- case _ =>
- ValidationFailure("Tumbling window expects size literal of type
Interval of Milliseconds " +
- "or Interval of Rows.")
- }
-}
-
-case class ProcessingTimeTumblingGroupWindow(
- override val alias: Expression,
- size: Expression)
- extends ProcessingTimeGroupWindow(alias) {
-
- override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
- ProcessingTimeTumblingGroupWindow(
- resolve(alias),
- resolve(size))
-
- override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv,
size))
-
- override def toString: String =
s"ProcessingTimeTumblingGroupWindow($alias, $size)"
-}
-
-case class EventTimeTumblingGroupWindow(
- override val alias: Expression,
+case class TumblingGroupWindow(
+ alias: Expression,
timeField: Expression,
size: Expression)
- extends EventTimeGroupWindow(
+ extends LogicalWindow(
alias,
timeField) {
override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
- EventTimeTumblingGroupWindow(
+ TumblingGroupWindow(
resolve(alias),
resolve(timeField),
resolve(size))
override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv)
- .orElse(TumblingGroupWindow.validate(tableEnv, size))
- .orElse(size match {
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS)
- if tableEnv.isInstanceOf[StreamTableEnvironment] =>
+ super.validate(tableEnv).orElse(
+ tableEnv match {
+
+ // check size
+ case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size)
=>
+ ValidationFailure(
+ "Tumbling window expects size literal of type Interval of
Milliseconds " +
+ "or Interval of Rows.")
+
+ // check time attribute
+ case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
+ ValidationFailure(
+ "Tumbling window expects a time attribute for grouping in a
stream environment.")
+ case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
--- End diff --
Should this be something like `!(isTimePoint(timeField.resultType) ||
isLong(timeField.resultType))`?
same for the other windows?
> Integrate time indicators for Table API & SQL
> ---------------------------------------------
>
> Key: FLINK-5884
> URL: https://issues.apache.org/jira/browse/FLINK-5884
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Blocker
> Fix For: 1.3.0
>
>
> We already discussed the need for a proper integration of time indicators
> (event-time or processing-time) for both the Table API & SQL on the ML:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-SQL-indicators-for-event-and-processing-time-tp15927.html
> This issue will track the progress. I will work on a design document how we
> can solve this issue.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)