Repository: flink Updated Branches: refs/heads/release-1.3 5d05c18d8 -> e23328e4a
[FLINK-6517] [table] Support multiple consecutive windows This closes #3897. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e23328e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e23328e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e23328e4 Branch: refs/heads/release-1.3 Commit: e23328e4ad4a370af94b1a7441dfcf356eda62f5 Parents: 5d05c18 Author: twalthr <twal...@apache.org> Authored: Fri May 12 10:03:25 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Mon May 15 12:16:36 2017 +0200 ---------------------------------------------------------------------- .../table/api/StreamTableEnvironment.scala | 3 + .../flink/table/api/scala/expressionDsl.scala | 8 ++- .../calcite/RelTimeIndicatorConverter.scala | 18 ++++-- .../table/expressions/fieldExpression.scala | 60 ++++++++++++++----- .../table/expressions/windowProperties.scala | 25 +++++--- .../table/plan/logical/LogicalWindow.scala | 2 +- .../flink/table/plan/logical/operators.scala | 36 +++++++++--- .../DataStreamGroupWindowAggregate.scala | 8 ++- .../DataStreamLogicalWindowAggregateRule.scala | 10 ++-- .../table/typeutils/TimeIndicatorTypeInfo.scala | 3 +- .../stream/StreamTableEnvironmentTest.scala | 7 +++ .../api/scala/stream/TableSourceTest.scala | 4 +- .../scala/stream/table/GroupWindowTest.scala | 27 ++++----- .../calcite/RelTimeIndicatorConverterTest.scala | 54 ++++++++++++++++- .../datastream/TimeAttributesITCase.scala | 62 +++++++++++++++++++- 15 files changed, 260 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 994ac80..c430b21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -467,6 +467,9 @@ abstract class StreamTableEnvironment( proctime = Some(idx, name) } case (u: UnresolvedFieldReference, _) => fieldNames = u.name :: fieldNames + + case _ => + throw new TableException("Time attributes can only be defined on field references.") } if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) { http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 6d15212..b87bb6d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -234,12 +234,14 @@ trait ImplicitExpressionOperations { def desc = Desc(expr) /** - * Returns the start time of a window when applied on a window reference. + * Returns the start time (inclusive) of a window when applied on a window reference. */ def start = WindowStart(expr) /** - * Returns the end time of a window when applied on a window reference. + * Returns the end time (exclusive) of a window when applied on a window reference. + * + * e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000. */ def end = WindowEnd(expr) @@ -683,7 +685,7 @@ trait ImplicitExpressionOperations { */ def element() = ArrayElement(expr) - // Schema definition + // Time definition /** * Declares a field as the rowtime attribute for indicating, accessing, and working in http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala index 7ceb397..21fa70b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala @@ -342,10 +342,20 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { } // remove time indicator return type - if (isTimeIndicatorType(updatedCall.getType)) { - updatedCall.clone(timestamp, materializedOperands) - } else { - updatedCall.clone(updatedCall.getType, materializedOperands) + updatedCall.getOperator match { + + // we do not modify AS if operand has not been materialized + case SqlStdOperatorTable.AS if + isTimeIndicatorType(updatedCall.getOperands.get(0).getType) => + updatedCall + + // materialize function's result and operands + case _ if isTimeIndicatorType(updatedCall.getType) => + updatedCall.clone(timestamp, materializedOperands) + + // materialize function's operands only + case _ => + updatedCall.clone(updatedCall.getType, materializedOperands) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala index 362d846..99adabf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala @@ -19,8 +19,10 @@ package org.apache.flink.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{UnresolvedException, ValidationException} +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, isTimeIndicatorType} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} @@ -117,13 +119,13 @@ case class UnresolvedAlias(child: Expression) extends UnaryExpression with Named override private[flink] lazy val valid = false } -case class WindowReference(name: String) extends Attribute { +case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute { override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = throw new UnsupportedOperationException("A window reference can not be used solely.") override private[flink] def resultType: TypeInformation[_] = - throw new UnsupportedOperationException("A window reference has no result type.") + tpe.getOrElse(throw UnresolvedException("Could not resolve type of referenced window.")) override private[flink] def withName(newName: String): Attribute = { if (newName == name) { @@ -132,31 +134,61 @@ case class WindowReference(name: String) extends Attribute { throw new ValidationException("Cannot rename window reference.") } } + + override def toString: String = s"'$name" } abstract class TimeAttribute(val expression: Expression) extends UnaryExpression - with NamedExpression { + with WindowProperty { override private[flink] def child: Expression = expression - - override private[flink] def name: String = expression match { - case UnresolvedFieldReference(name) => name - case _ => throw new ValidationException("Unresolved field reference expected.") - } - - override private[flink] def toAttribute: Attribute = - throw new UnsupportedOperationException("Time attribute can not be used solely.") } case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { - override private[flink] def resultType: TypeInformation[_] = + override private[flink] def validateInput(): ValidationResult = { + child match { + case WindowReference(_, Some(tpe)) if !isRowtimeIndicatorType(tpe) => + ValidationFailure("A proctime window cannot provide a rowtime attribute.") + case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) => + ValidationSuccess + case WindowReference(_, _) => + ValidationFailure("Reference to a rowtime or proctime window required.") + case _ => + ValidationFailure( + "The '.rowtime' expression can only be used for table definitions and windows.") + } + } + + override def resultType: TypeInformation[_] = TimeIndicatorTypeInfo.ROWTIME_INDICATOR + + override def toNamedWindowProperty(name: String): NamedWindowProperty = + NamedWindowProperty(name, this) + + override def toString: String = s"rowtime($child)" } case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) { - override private[flink] def resultType: TypeInformation[_] = + override private[flink] def validateInput(): ValidationResult = { + child match { + case WindowReference(_, Some(tpe)) if isTimeIndicatorType(tpe) => + ValidationSuccess + case WindowReference(_, _) => + ValidationFailure("Reference to a rowtime or proctime window required.") + case _ => + ValidationFailure( + "The '.proctime' expression can only be used for table definitions and windows.") + } + } + + override def resultType: TypeInformation[_] = TimeIndicatorTypeInfo.PROCTIME_INDICATOR + + override def toNamedWindowProperty(name: String): NamedWindowProperty = + NamedWindowProperty(name, this) + + override def toString: String = s"proctime($child)" } http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala index 990d928..e119247 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala @@ -20,12 +20,22 @@ package org.apache.flink.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.table.calcite.FlinkRelBuilder import FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess} -abstract class WindowProperty(child: Expression) extends UnaryExpression { +trait WindowProperty { + + def toNamedWindowProperty(name: String): NamedWindowProperty + + def resultType: TypeInformation[_] + +} + +abstract class AbstractWindowProperty(child: Expression) + extends UnaryExpression + with WindowProperty { override def toString = s"WindowProperty($child)" @@ -39,20 +49,19 @@ abstract class WindowProperty(child: Expression) extends UnaryExpression { ValidationFailure("Child must be a window reference.") } - private[flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder) - : NamedWindowProperty = NamedWindowProperty(name, this) + def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this) } -case class WindowStart(child: Expression) extends WindowProperty(child) { +case class WindowStart(child: Expression) extends AbstractWindowProperty(child) { - override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP + override def resultType = SqlTimeTypeInfo.TIMESTAMP override def toString: String = s"start($child)" } -case class WindowEnd(child: Expression) extends WindowProperty(child) { +case class WindowEnd(child: Expression) extends AbstractWindowProperty(child) { - override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP + override def resultType = SqlTimeTypeInfo.TIMESTAMP override def toString: String = s"end($child)" } http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala index 6161ef0..a328703 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala @@ -36,7 +36,7 @@ abstract class LogicalWindow( def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow = this def validate(tableEnv: TableEnvironment): ValidationResult = aliasAttribute match { - case WindowReference(_) => ValidationSuccess + case WindowReference(_, _) => ValidationSuccess case _ => ValidationFailure("Window reference for window expected.") } http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 36067eb..bfb6cbf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -552,20 +552,38 @@ case class WindowAggregate( override def resolveReference( tableEnv: TableEnvironment, name: String) - : Option[NamedExpression] = window.aliasAttribute match { + : Option[NamedExpression] = { + + def resolveAlias(alias: String) = { + // check if reference can already be resolved by input fields + val found = super.resolveReference(tableEnv, name) + if (found.isDefined) { + failValidation(s"Reference $name is ambiguous.") + } else { + // resolve type of window reference + val resolvedType = window.timeAttribute match { + case UnresolvedFieldReference(n) => + super.resolveReference(tableEnv, n) match { + case Some(ResolvedFieldReference(_, tpe)) => Some(tpe) + case _ => None + } + case _ => None + } + // let validation phase throw an error if type could not be resolved + Some(WindowReference(name, resolvedType)) + } + } + + window.aliasAttribute match { // resolve reference to this window's name case UnresolvedFieldReference(alias) if name == alias => - // check if reference can already be resolved by input fields - val found = super.resolveReference(tableEnv, name) - if (found.isDefined) { - failValidation(s"Reference $name is ambiguous.") - } else { - Some(WindowReference(name)) - } + resolveAlias(alias) + case _ => // resolve references as usual super.resolveReference(tableEnv, name) } + } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { val flinkRelBuilder = relBuilder.asInstanceOf[FlinkRelBuilder] @@ -574,7 +592,7 @@ case class WindowAggregate( window, relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava), propertyExpressions.map { - case Alias(prop: WindowProperty, name, _) => prop.toNamedWindowProperty(name)(relBuilder) + case Alias(prop: WindowProperty, name, _) => prop.toNamedWindowProperty(name) case _ => throw new RuntimeException("This should never happen.") }, aggregateExpressions.map { http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index 2a71592..c158579 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.expressions.ExpressionUtils._ import org.apache.flink.table.plan.logical._ @@ -118,6 +119,9 @@ class DataStreamGroupWindowAggregate( inputSchema.mapAggregateCall(namedAggregate.left), namedAggregate.right) } + val physicalNamedProperties = namedProperties + .filter(np => !FlinkTypeFactory.isTimeIndicatorType(np.property.resultType)) + val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) if (consumeRetraction) { @@ -159,7 +163,7 @@ class DataStreamGroupWindowAggregate( physicalGrouping.length, physicalNamedAggregates.size, schema.physicalArity, - namedProperties) + physicalNamedProperties) val keyedStream = inputDS.keyBy(physicalGrouping: _*) val windowedStream = @@ -185,7 +189,7 @@ class DataStreamGroupWindowAggregate( val windowFunction = AggregateUtil.createAggregationAllWindowFunction( window, schema.physicalArity, - namedProperties) + physicalNamedProperties) val windowedStream = createNonKeyedWindowedStream(window, inputDS) http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index d57d4cc..38de539 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -23,10 +23,10 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.flink.table.api.{TableException, Window} import org.apache.flink.table.api.scala.{Session, Slide, Tumble} +import org.apache.flink.table.api.{TableException, Window} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, WindowReference} import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule import org.apache.flink.table.typeutils.TimeIntervalTypeInfo @@ -84,7 +84,7 @@ class DataStreamLogicalWindowAggregateRule val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as("w$") + w.on(time).as(WindowReference("w$")) case SqlStdOperatorTable.HOP => val time = getOperandAsTimeIndicator(windowExpr, 0) @@ -93,14 +93,14 @@ class DataStreamLogicalWindowAggregateRule .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as("w$") + w.on(time).as(WindowReference("w$")) case SqlStdOperatorTable.SESSION => val time = getOperandAsTimeIndicator(windowExpr, 0) val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as("w$") + w.on(time).as(WindowReference("w$")) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala index 31dcb5c..083f1eb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala @@ -34,7 +34,8 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean) SqlTimestampSerializer.INSTANCE, classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) { - override def toString: String = s"TimeIndicatorTypeInfo" + override def toString: String = + s"TimeIndicatorTypeInfo(${if (isEventTime) "rowtime" else "proctime" })" } object TimeIndicatorTypeInfo { http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala index 7797f22..3c1668f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala @@ -36,6 +36,13 @@ import org.mockito.Mockito.{mock, when} class StreamTableEnvironmentTest extends TableTestBase { @Test(expected = classOf[TableException]) + def testInvalidTimeAttributes(): Unit = { + val util = streamTestUtil() + // table definition makes no sense + util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c, 'd, 'e) + } + + @Test(expected = classOf[TableException]) def testInvalidProctimeAttribute(): Unit = { val util = streamTestUtil() // cannot replace an attribute with proctime http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala index cda90f7..890ad32 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala @@ -71,8 +71,8 @@ class TableSourceTest extends TableTestBase { term("where", ">(val, 100)") ), term("groupBy", "name"), - term("window", "TumblingGroupWindow(WindowReference(w), 'addTime, 600000.millis)"), - term("select", "name", "AVG(val) AS TMP_1", "end(WindowReference(w)) AS TMP_0") + term("window", "TumblingGroupWindow('w, 'addTime, 600000.millis)"), + term("select", "name", "AVG(val) AS TMP_1", "end('w) AS TMP_0") ), term("select", "name", "TMP_0", "TMP_1") ) http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala index ef071b7..b389183 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala @@ -170,7 +170,6 @@ class GroupWindowTest extends TableTestBase { .select('string, weightedAvg('string, 'int)) // invalid UDAGG args } - @Ignore // TODO @Test def testMultiWindow(): Unit = { val util = streamTestUtil() @@ -179,7 +178,7 @@ class GroupWindowTest extends TableTestBase { val windowedTable = table .window(Tumble over 50.milli on 'proctime as 'w1) .groupBy('w1, 'string) - .select('w.end as 'proctime, 'string, 'int.count) + .select('w1.proctime as 'proctime, 'string, 'int.count) .window(Slide over 20.milli every 10.milli on 'proctime as 'w2) .groupBy('w2) .select('string.count) @@ -193,7 +192,7 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "string", "int") + term("select", "string", "int", "proctime") ), term("groupBy", "string"), term( @@ -202,9 +201,9 @@ class GroupWindowTest extends TableTestBase { WindowReference("w1"), 'proctime, 50.milli)), - term("select", "string", "COUNT(int) AS TMP_0") + term("select", "string", "COUNT(int) AS TMP_1", "proctime('w1) AS TMP_0") ), - term("select", "string") + term("select", "string", "TMP_0 AS proctime") ), term( "window", @@ -213,7 +212,7 @@ class GroupWindowTest extends TableTestBase { 'proctime, 20.milli, 10.milli)), - term("select", "COUNT(string) AS TMP_1") + term("select", "COUNT(string) AS TMP_2") ) util.verifyTable(windowedTable, expected) } @@ -784,8 +783,8 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "COUNT(int) AS TMP_0", - "start(WindowReference(w)) AS TMP_1", - "end(WindowReference(w)) AS TMP_2") + "start('w) AS TMP_1", + "end('w) AS TMP_2") ) util.verifyTable(windowedTable, expected) @@ -852,8 +851,8 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "COUNT(int) AS TMP_0", - "start(WindowReference(w)) AS TMP_1", - "end(WindowReference(w)) AS TMP_2") + "start('w) AS TMP_1", + "end('w) AS TMP_2") ) util.verifyTable(windowedTable, expected) @@ -879,8 +878,8 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "COUNT(int) AS TMP_1", - "end(WindowReference(w)) AS TMP_0", - "start(WindowReference(w)) AS TMP_2") + "end('w) AS TMP_0", + "start('w) AS TMP_2") ), term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2") ) @@ -909,8 +908,8 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "SUM(int) AS TMP_0", - "start(WindowReference(w)) AS TMP_1", - "end(WindowReference(w)) AS TMP_2") + "start('w) AS TMP_1", + "end('w) AS TMP_2") ), term("select", "string", http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala index 7ac0874..cf55d48 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala @@ -243,10 +243,10 @@ class RelTimeIndicatorConverterTest extends TableTestBase { term( "window", TumblingGroupWindow( - WindowReference("w"), + 'w, 'rowtime, 100.millis)), - term("select", "long", "SUM(int) AS TMP_1", "end(WindowReference(w)) AS TMP_0") + term("select", "long", "SUM(int) AS TMP_1", "end('w) AS TMP_0") ), term("select", "TMP_0 AS rowtime", "long", "TMP_1") ) @@ -273,7 +273,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase { term( "window", TumblingGroupWindow( - 'w$, + WindowReference("w$"), 'rowtime, 100.millis)), term("select", "long", "SUM(int) AS EXPR$2", "start('w$) AS w$start", "end('w$) AS w$end") @@ -338,6 +338,54 @@ class RelTimeIndicatorConverterTest extends TableTestBase { util.verifyTable(result, expected) } + @Test + def testMultiWindow(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'long) + .select('w.rowtime as 'newrowtime, 'long, 'int.sum as 'int) + .window(Tumble over 1.second on 'newrowtime as 'w2) + .groupBy('w2, 'long) + .select('w2.end, 'long, 'int.sum) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + streamTableNode(0), + term("groupBy", "long"), + term( + "window", + TumblingGroupWindow( + 'w, + 'rowtime, + 100.millis)), + term("select", "long", "SUM(int) AS TMP_1", "rowtime('w) AS TMP_0") + ), + term("select", "TMP_0 AS newrowtime", "long", "TMP_1 AS int") + ), + term("groupBy", "long"), + term( + "window", + TumblingGroupWindow( + 'w2, + 'newrowtime, + 1000.millis)), + term("select", "long", "SUM(int) AS TMP_3", "end('w2) AS TMP_2") + ), + term("select", "TMP_2", "long", "TMP_3") + ) + + util.verifyTable(result, expected) + } + } object RelTimeIndicatorConverterTest { http://git-wip-us.apache.org/repos/asf/flink/blob/e23328e4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala index 7d7088e..3f12218 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala @@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.stream.utils.StreamITCase -import org.apache.flink.table.api.{TableEnvironment, TableException, Types} +import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException} import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc import org.apache.flink.table.expressions.TimeIntervalUnit import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark @@ -62,6 +62,33 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) } + @Test(expected = classOf[ValidationException]) + def testInvalidUseOfRowtime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + .select('rowtime.rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidUseOfRowtime2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + stream + .toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + .window(Tumble over 2.millis on 'rowtime as 'w) + .groupBy('w) + .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window reference + } + @Test def testCalcMaterialization(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -215,6 +242,39 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testMultiWindow(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table + .window(Tumble over 2.millis on 'rowtime as 'w) + .groupBy('w) + .select('w.rowtime as 'rowtime, 'int.count as 'int) + .window(Tumble over 4.millis on 'rowtime as 'w2) + .groupBy('w2) + .select('w2.rowtime, 'w2.end, 'int.count) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004,2", + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008,2", + "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012,1", + "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02,1" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + } object TimeAttributesITCase {