Repository: flink Updated Branches: refs/heads/master f24514339 -> 8304f3e15
[FLINK-5624] [table] Add SQL support for tumbling windows on streaming tables. This closes #3252. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8304f3e1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8304f3e1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8304f3e1 Branch: refs/heads/master Commit: 8304f3e159851d29691e66cacfcb4278d73a8b97 Parents: f245143 Author: Haohui Mai <[email protected]> Authored: Tue Feb 14 13:28:44 2017 -0800 Committer: Fabian Hueske <[email protected]> Committed: Thu Feb 16 18:33:33 2017 +0100 ---------------------------------------------------------------------- .../functions/TimeModeIndicatorFunctions.scala | 53 +++++++ .../flink/table/plan/rules/FlinkRuleSets.scala | 3 + .../datastream/LogicalWindowAggregateRule.scala | 139 +++++++++++++++++++ .../flink/table/validate/FunctionCatalog.scala | 11 +- .../scala/stream/sql/WindowAggregateTest.scala | 111 +++++++++++++++ 5 files changed, 314 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala new file mode 100644 index 0000000..7a7e00f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions + +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName} +import org.apache.calcite.sql.validate.SqlMonotonicity +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.LeafExpression + +object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC, + SqlFunctionCategory.SYSTEM) { + override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION + + override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity = + SqlMonotonicity.INCREASING +} + +abstract class TimeIndicator extends LeafExpression { + /** + * Returns the [[org.apache.flink.api.common.typeinfo.TypeInformation]] + * for evaluating this expression. + * It is sometimes not available until the expression is valid. + */ + override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP + + /** + * Convert Expression to its counterpart in Calcite, i.e. RexNode + */ + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = + throw new TableException("indicator functions (e.g. proctime() and rowtime()" + + " are not executable. Please check your expressions.") +} + +case class RowTime() extends TimeIndicator http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index a24a06d..f9c8d8d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -125,6 +125,9 @@ object FlinkRuleSets { * RuleSet to normalize plans for stream / DataStream execution */ val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList( + // Transform window to LogicalWindowAggregate + LogicalWindowAggregateRule.INSTANCE, + // simplify expressions rules ReduceExpressionsRule.FILTER_INSTANCE, ReduceExpressionsRule.PROJECT_INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala new file mode 100644 index 0000000..094e47b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import java.util.Calendar + +import com.google.common.collect.ImmutableList +import org.apache.calcite.avatica.util.TimeUnitRange +import org.apache.calcite.plan._ +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} +import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode} +import org.apache.calcite.sql.fun.SqlFloorFunction +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.api.scala.Tumble +import org.apache.flink.table.api.{TableException, TumblingWindow, Window} +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.EventTimeExtractor +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class LogicalWindowAggregateRule + extends RelOptRule( + LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE, + "LogicalWindowAggregateRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + val agg = call.rel(0).asInstanceOf[LogicalAggregate] + + val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet + + val windowClause = recognizeWindow(agg) + !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined + } + + /** + * Transform LogicalAggregate with windowing expression to LogicalProject + * + LogicalWindowAggregate + LogicalProject. + * + * The transformation adds an additional LogicalProject at the top to ensure + * that the types are equivalent. + */ + override def onMatch(call: RelOptRuleCall): Unit = { + val agg = call.rel[LogicalAggregate](0) + val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject] + val (windowExprIdx, window) = recognizeWindow(agg).get + val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx)) + + val builder = call.builder() + val rexBuilder = builder.getRexBuilder + val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3) + + val newAgg = builder + .push(project.getInput) + .project(project.getChildExps.updated(windowExprIdx, zero)) + .aggregate(builder.groupKey( + newGroupSet, + agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList) + .build().asInstanceOf[LogicalAggregate] + + // Create an additional project to conform with types + val transformed = call.builder() + transformed.push(LogicalWindowAggregate.create( + window.toLogicalWindow, + Seq[NamedWindowProperty](), + newAgg)) + .project(transformed.fields().patch(windowExprIdx, Seq(zero), 0)) + call.transformTo(transformed.build()) + } + + private def recognizeWindow(agg: LogicalAggregate) : Option[(Int, Window)] = { + val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject] + val key = agg.getGroupSet.asList() + val fields = key.flatMap(x => nodeToMaybeWindow(project.getProjects.get(x)) match { + case Some(w) => Some(x.toInt, w) + case _ => None + }) + fields.size match { + case 0 => None + case 1 => Some(fields.head) + case _ => throw new TableException("Multiple windows are not supported") + } + } + + private def nodeToMaybeWindow(field: RexNode): Option[Window] = { + field match { + case call: RexCall => + call.getOperator match { + case _: SqlFloorFunction => call.getOperands.get(0) match { + case c: RexCall => if (c.getOperator == EventTimeExtractor) { + val unit = call.getOperands.get(1) + .asInstanceOf[RexLiteral].getValue.asInstanceOf[TimeUnitRange] + return Some(LogicalWindowAggregateRule.timeUnitRangeToWindow(unit) + .on("rowtime")) + } + case _ => + } + case _ => + } + case _ => + } + None + } +} + +object LogicalWindowAggregateRule { + private[flink] val TIMESTAMP_ZERO = Calendar.getInstance() + TIMESTAMP_ZERO.setTimeInMillis(0) + + private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], + RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) + + private[flink] val INSTANCE = new LogicalWindowAggregateRule + + private val EXPR_ONE = ExpressionParser.parseExpression("1") + + def timeUnitRangeToWindow(range: TimeUnitRange): TumblingWindow = { + Tumble over ExpressionUtils.toMilliInterval(EXPR_ONE, range.startUnit.multiplier.longValue()) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index c00f8bb..207eba1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -23,8 +23,8 @@ import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTabl import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable} import org.apache.flink.table.api.ValidationException import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.functions.utils.{TableSqlFunction, UserDefinedFunctionUtils} +import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction} import scala.collection.JavaConversions._ import scala.collection.mutable @@ -190,11 +190,14 @@ object FunctionCatalog { // array "cardinality" -> classOf[ArrayCardinality], "at" -> classOf[ArrayElementAt], - "element" -> classOf[ArrayElement] + "element" -> classOf[ArrayElement], // TODO implement function overloading here // "floor" -> classOf[TemporalFloor] // "ceil" -> classOf[TemporalCeil] + + // extensions to support streaming query + "rowtime" -> classOf[RowTime] ) /** @@ -317,7 +320,9 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.EXTRACT, SqlStdOperatorTable.QUARTER, SqlStdOperatorTable.SCALAR_QUERY, - SqlStdOperatorTable.EXISTS + SqlStdOperatorTable.EXISTS, + // EXTENSIONS + EventTimeExtractor ) builtInSqlOperators.foreach(register) http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala new file mode 100644 index 0000000..183b84c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class WindowAggregateTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) + + @Test + def testNonPartitionedTumbleWindow() = { + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "1970-01-01 00:00:00 AS $f0") + ), + term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 3600000.millis)), + term("select", "COUNT(*) AS EXPR$0") + ), + term("select", "EXPR$0") + ) + streamUtil.verifySql(sql, expected) + } + + @Test + def testPartitionedTumbleWindow1() = { + val sql = "SELECT a, COUNT(*) FROM MyTable GROUP BY a, FLOOR(rowtime() TO MINUTE)" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "1970-01-01 00:00:00 AS $f1") + ), + term("groupBy", "a"), + term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 60000.millis)), + term("select", "a", "COUNT(*) AS EXPR$1") + ), + term("select", "a", "EXPR$1") + ) + streamUtil.verifySql(sql, expected) + } + + @Test + def testPartitionedTumbleWindow2() = { + val sql = "SELECT a, SUM(c), b FROM MyTable GROUP BY a, FLOOR(rowtime() TO SECOND), b" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c") + ), + term("groupBy", "a, b"), + term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 1000.millis)), + term("select", "a", "b", "SUM(c) AS EXPR$1") + ), + term("select", "a", "EXPR$1", "b") + ) + streamUtil.verifySql(sql, expected) + } + + @Test(expected = classOf[TableException]) + def testMultiWindow() = { + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " + + "FLOOR(rowtime() TO HOUR), FLOOR(rowtime() TO MINUTE)" + val expected = "" + streamUtil.verifySql(sql, expected) + } + + @Test(expected = classOf[TableException]) + def testInvalidWindowExpression() = { + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(localTimestamp TO HOUR)" + val expected = "" + streamUtil.verifySql(sql, expected) + } +}
