Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3808#discussion_r114814140
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
---
@@ -18,259 +18,165 @@
package org.apache.flink.table.plan.logical
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.table.api.{BatchTableEnvironment,
StreamTableEnvironment, TableEnvironment}
+import
org.apache.flink.table.expressions.ExpressionUtils.{isRowCountLiteral,
isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}
import org.apache.flink.table.expressions._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo,
TimeIntervalTypeInfo, TypeCoercion}
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint
import org.apache.flink.table.validate.{ValidationFailure,
ValidationResult, ValidationSuccess}
-abstract class EventTimeGroupWindow(
- alias: Expression,
- time: Expression)
- extends LogicalWindow(alias) {
-
- override def validate(tableEnv: TableEnvironment): ValidationResult = {
- val valid = super.validate(tableEnv)
- if (valid.isFailure) {
- return valid
- }
-
- tableEnv match {
- case _: StreamTableEnvironment =>
- time match {
- case RowtimeAttribute() =>
- ValidationSuccess
- case _ =>
- ValidationFailure("Event-time window expects a 'rowtime' time
field.")
- }
- case _: BatchTableEnvironment =>
- if (!TypeCoercion.canCast(time.resultType,
BasicTypeInfo.LONG_TYPE_INFO)) {
- ValidationFailure(s"Event-time window expects a time field that
can be safely cast " +
- s"to Long, but is ${time.resultType}")
- } else {
- ValidationSuccess
- }
- }
-
- }
-}
-
-abstract class ProcessingTimeGroupWindow(alias: Expression) extends
LogicalWindow(alias) {
- override def validate(tableEnv: TableEnvironment): ValidationResult = {
- val valid = super.validate(tableEnv)
- if (valid.isFailure) {
- return valid
- }
-
- tableEnv match {
- case b: BatchTableEnvironment => ValidationFailure(
- "Window on batch must declare a time attribute over which the
query is evaluated.")
- case _ =>
- ValidationSuccess
- }
- }
-}
-
//
------------------------------------------------------------------------------------------------
// Tumbling group windows
//
------------------------------------------------------------------------------------------------
-object TumblingGroupWindow {
- def validate(tableEnv: TableEnvironment, size: Expression):
ValidationResult = size match {
- case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- ValidationSuccess
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
- ValidationSuccess
- case _ =>
- ValidationFailure("Tumbling window expects size literal of type
Interval of Milliseconds " +
- "or Interval of Rows.")
- }
-}
-
-case class ProcessingTimeTumblingGroupWindow(
- override val alias: Expression,
- size: Expression)
- extends ProcessingTimeGroupWindow(alias) {
-
- override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
- ProcessingTimeTumblingGroupWindow(
- resolve(alias),
- resolve(size))
-
- override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv,
size))
-
- override def toString: String =
s"ProcessingTimeTumblingGroupWindow($alias, $size)"
-}
-
-case class EventTimeTumblingGroupWindow(
- override val alias: Expression,
+case class TumblingGroupWindow(
+ alias: Expression,
timeField: Expression,
size: Expression)
- extends EventTimeGroupWindow(
+ extends LogicalWindow(
alias,
timeField) {
override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
- EventTimeTumblingGroupWindow(
+ TumblingGroupWindow(
resolve(alias),
resolve(timeField),
resolve(size))
override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv)
- .orElse(TumblingGroupWindow.validate(tableEnv, size))
- .orElse(size match {
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS)
- if tableEnv.isInstanceOf[StreamTableEnvironment] =>
+ super.validate(tableEnv).orElse(
+ tableEnv match {
+
+ // check size
+ case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size)
=>
+ ValidationFailure(
+ "Tumbling window expects size literal of type Interval of
Milliseconds " +
+ "or Interval of Rows.")
+
+ // check time attribute
+ case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
+ ValidationFailure(
+ "Tumbling window expects a time attribute for grouping in a
stream environment.")
+ case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
--- End diff --
Should this be something like `!(isTimePoint(timeField.resultType) ||
isLong(timeField.resultType))`?
same for the other windows?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---