[FLINK-5710] [table] Add proctime() function to indicate processing time in Stream SQL.
This closes #3370. This closes #3302. // duplicate of #3370 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a755de27 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a755de27 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a755de27 Branch: refs/heads/master Commit: a755de27b85fe72be4a6f2063225ddc5c7f69058 Parents: 8c78aba Author: Haohui Mai <[email protected]> Authored: Thu Feb 23 13:51:45 2017 -0800 Committer: Fabian Hueske <[email protected]> Committed: Fri Feb 24 17:12:42 2017 +0100 ---------------------------------------------------------------------- .../functions/TimeModeIndicatorFunctions.scala | 10 ++++++ .../datastream/LogicalWindowAggregateRule.scala | 38 +++++++++++++------- .../flink/table/validate/FunctionCatalog.scala | 10 +++--- .../scala/stream/sql/WindowAggregateTest.scala | 23 +++++++++++- 4 files changed, 63 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala index 7a7e00f..b9b66ea 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala @@ -34,6 +34,15 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION, SqlMonotonicity.INCREASING } +object ProcTimeExtractor extends SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC, + SqlFunctionCategory.SYSTEM) { + override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION + + override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity = + SqlMonotonicity.INCREASING +} + abstract class TimeIndicator extends LeafExpression { /** * Returns the [[org.apache.flink.api.common.typeinfo.TypeInformation]] @@ -51,3 +60,4 @@ abstract class TimeIndicator extends LeafExpression { } case class RowTime() extends TimeIndicator +case class ProcTime() extends TimeIndicator http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala index 094e47b..f5eb5f9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala @@ -25,14 +25,16 @@ import org.apache.calcite.plan._ import org.apache.calcite.plan.hep.HepRelVertex import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode} +import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlFloorFunction import org.apache.calcite.util.ImmutableBitSet import org.apache.flink.table.api.scala.Tumble import org.apache.flink.table.api.{TableException, TumblingWindow, Window} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.EventTimeExtractor +import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor} import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate +import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import scala.collection.JavaConversions._ @@ -104,15 +106,11 @@ class LogicalWindowAggregateRule field match { case call: RexCall => call.getOperator match { - case _: SqlFloorFunction => call.getOperands.get(0) match { - case c: RexCall => if (c.getOperator == EventTimeExtractor) { - val unit = call.getOperands.get(1) - .asInstanceOf[RexLiteral].getValue.asInstanceOf[TimeUnitRange] - return Some(LogicalWindowAggregateRule.timeUnitRangeToWindow(unit) - .on("rowtime")) - } - case _ => - } + case _: SqlFloorFunction => + val unit: TimeUnitRange = LogicalWindowAggregateRule.getLiteral(call.getOperands.get(1)) + val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) + return LogicalWindowAggregateRule.decorateTimeIndicator( + call.getOperands.get(0).asInstanceOf[RexCall].getOperator, w) case _ => } case _ => @@ -130,10 +128,24 @@ object LogicalWindowAggregateRule { private[flink] val INSTANCE = new LogicalWindowAggregateRule - private val EXPR_ONE = ExpressionParser.parseExpression("1") + private def decorateTimeIndicator(operator: SqlOperator, window: TumblingWindow) = { + operator match { + case EventTimeExtractor => Some(window.on("rowtime")) + case ProcTimeExtractor => Some(window) + case _ => None + } + } + + private def timeUnitRangeToTumbleWindow(range: TimeUnitRange): TumblingWindow = { + intervalToTumbleWindow(range.startUnit.multiplier.longValue()) + } + + private def intervalToTumbleWindow(size: Long): TumblingWindow = { + Tumble over Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS) + } - def timeUnitRangeToWindow(range: TimeUnitRange): TumblingWindow = { - Tumble over ExpressionUtils.toMilliInterval(EXPR_ONE, range.startUnit.multiplier.longValue()) + private def getLiteral[T](node: RexNode): T = { + node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] } } http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 94237f7..3c89ec4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -23,8 +23,8 @@ import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTabl import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable} import org.apache.flink.table.api.ValidationException import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction} -import org.apache.flink.table.functions.utils.{TableSqlFunction, ScalarSqlFunction} +import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction} +import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction, _} import scala.collection.JavaConversions._ import scala.collection.mutable @@ -197,7 +197,8 @@ object FunctionCatalog { // "ceil" -> classOf[TemporalCeil] // extensions to support streaming query - "rowtime" -> classOf[RowTime] + "rowtime" -> classOf[RowTime], + "proctime" -> classOf[ProcTime] ) /** @@ -322,7 +323,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.SCALAR_QUERY, SqlStdOperatorTable.EXISTS, // EXTENSIONS - EventTimeExtractor + EventTimeExtractor, + ProcTimeExtractor ) builtInSqlOperators.foreach(register) http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 183b84c..06088ab 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.api.scala.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api.TableException import org.apache.flink.table.api.scala._ -import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow +import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeTumblingGroupWindow} import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.apache.flink.table.utils.TableTestUtil._ import org.junit.Test @@ -94,6 +94,27 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + @Test + def testProcessingTime() = { + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(proctime() TO HOUR)" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "1970-01-01 00:00:00 AS $f0") + ), + term("window", ProcessingTimeTumblingGroupWindow(None, 3600000.millis)), + term("select", "COUNT(*) AS EXPR$0") + ), + term("select", "EXPR$0") + ) + streamUtil.verifySql(sql, expected) + } + @Test(expected = classOf[TableException]) def testMultiWindow() = { val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
