Repository: flink Updated Branches: refs/heads/master b05ea6939 -> 6a456c673
[FLINK-3580] [table] Implement FLOOR/CEIL for time points This closes #2391. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a456c67 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a456c67 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a456c67 Branch: refs/heads/master Commit: 6a456c67316a5e8ad3256e4cbfe37397b0c87282 Parents: b05ea69 Author: twalthr <[email protected]> Authored: Fri Aug 19 12:18:49 2016 +0200 Committer: twalthr <[email protected]> Committed: Fri Aug 26 16:19:59 2016 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 66 ++++++++++ .../flink/api/scala/table/expressionDsl.scala | 20 ++- .../flink/api/table/codegen/CodeGenUtils.scala | 13 +- .../table/codegen/calls/FloorCeilCallGen.scala | 53 ++++++-- .../table/codegen/calls/ScalarFunctions.scala | 44 ++++++- .../table/expressions/ExpressionParser.scala | 24 +++- .../flink/api/table/expressions/time.scala | 70 ++++++++++- .../api/table/validate/FunctionCatalog.scala | 3 + .../table/expressions/ScalarFunctionsTest.scala | 123 +++++++++++++++++++ 9 files changed, 396 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index cdd3667..7a20e6a 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1442,6 +1442,28 @@ TEMPORAL.extract(TIMEINTERVALUNIT) </td> </tr> + <tr> + <td> + {% highlight java %} +TIMEPOINT.floor(TIMEINTERVALUNIT) +{% endhighlight %} + </td> + <td> + <p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toDate.floor(MINUTE)</code> leads to 12:44:00.</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} +TIMEPOINT.ceil(TIMEINTERVALUNIT) +{% endhighlight %} + </td> + <td> + <p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(MINUTE)</code> leads to 12:45:00.</p> + </td> + </tr> + </tbody> </table> @@ -1683,6 +1705,28 @@ TEMPORAL.extract(TimeIntervalUnit) </td> </tr> + <tr> + <td> + {% highlight scala %} +TIMEPOINT.floor(TimeIntervalUnit) +{% endhighlight %} + </td> + <td> + <p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE)</code> leads to 12:44:00.</p> + </td> + </tr> + + <tr> + <td> + {% highlight scala %} +TIMEPOINT.ceil(TimeIntervalUnit) +{% endhighlight %} + </td> + <td> + <p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE)</code> leads to 12:45:00.</p> + </td> + </tr> + </tbody> </table> </div> @@ -1926,6 +1970,28 @@ EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL) </td> </tr> + <tr> + <td> + {% highlight sql %} +FLOOR(TIMEPOINT TO TIMEINTERVALUNIT) +{% endhighlight %} + </td> + <td> + <p>Rounds a time point down to the given unit. E.g. <code>FLOOR(TIME '12:44:31' TO MINUTE)</code> leads to 12:44:00.</p> + </td> + </tr> + + <tr> + <td> + {% highlight sql %} +CEIL(TIMEPOINT TO TIMEINTERVALUNIT) +{% endhighlight %} + </td> + <td> + <p>Rounds a time point up to the given unit. E.g. <code>CEIL(TIME '12:44:31' TO MINUTE)</code> leads to 12:45:00.</p> + </td> + </tr> + </tbody> </table> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 92c61a3..b14ca88 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -137,6 +137,8 @@ trait ImplicitExpressionOperations { */ def ceil() = Ceil(expr) + // String operations + /** * Creates a substring of the given string at given index for a given length. * @@ -216,6 +218,8 @@ trait ImplicitExpressionOperations { */ def similar(pattern: Expression) = Similar(expr, pattern) + // Temporal operations + /** * Parses a date String in the form "yy-mm-dd" to a SQL Date. */ @@ -238,7 +242,21 @@ trait ImplicitExpressionOperations { */ def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr) - // interval types + /** + * Rounds down a time point to the given unit. + * + * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00 + */ + def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr) + + /** + * Rounds up a time point to the given unit. + * + * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00 + */ + def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr) + + // Interval types /** * Creates an interval of the given number of years. http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala index 170af54..76f9b02 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala @@ -143,8 +143,17 @@ object CodeGenUtils { s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)" } - def compareEnum(term: String, enum: Enum[_]): Boolean = - term == qualifyEnum(enum) + def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum) + + def getEnum(genExpr: GeneratedExpression): Enum[_] = { + val split = genExpr.resultTerm.split('.') + val value = split.last + val clazz = genExpr.resultType.getTypeClass + enumValueOf(clazz, value) + } + + def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] = + Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]] // ---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala index 84f60a0..d41e9a7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala @@ -20,25 +20,54 @@ package org.apache.flink.api.table.codegen.calls import java.lang.reflect.Method -import org.apache.flink.api.common.typeinfo.BasicTypeInfo. - {DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO,BIG_DEC_TYPE_INFO} +import org.apache.calcite.avatica.util.TimeUnitRange +import org.apache.calcite.avatica.util.TimeUnitRange.{MONTH, YEAR} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO} +import org.apache.flink.api.table.codegen.CodeGenUtils.{getEnum, primitiveTypeTermForTypeInfo, qualifyMethod} +import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression} /** - * Generates arithmetic floor/ceil function calls. + * Generates floor/ceil function calls. */ -class FloorCeilCallGen(method: Method) extends MultiTypeMethodCallGen(method) { +class FloorCeilCallGen( + arithmeticMethod: Method, + temporalMethod: Option[Method] = None) + extends MultiTypeMethodCallGen(arithmeticMethod) { override def generate( codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) - : GeneratedExpression = { - operands.head.resultType match { - case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO => - super.generate(codeGenerator, operands) - case _ => - operands.head // no floor/ceil necessary - } - } + : GeneratedExpression = operands.size match { + // arithmetic + case 1 => + operands.head.resultType match { + case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO => + super.generate(codeGenerator, operands) + case _ => + operands.head // no floor/ceil necessary + } + + // temporal + case 2 => + val operand = operands.head + val unit = getEnum(operands(1)).asInstanceOf[TimeUnitRange] + val internalType = primitiveTypeTermForTypeInfo(operand.resultType) + generateCallIfArgsNotNull(codeGenerator.nullCheck, operand.resultType, operands) { + (terms) => + unit match { + case YEAR | MONTH => + s""" + |($internalType) ${qualifyMethod(temporalMethod.get)}(${terms(1)}, ${terms.head}) + |""".stripMargin + case _ => + s""" + |${qualifyMethod(arithmeticMethod)}( + | ($internalType) ${terms.head}, + | ($internalType) ${unit.startUnit.multiplier.intValue()}) + |""".stripMargin + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala index 44cb6d2..8aa632f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala @@ -26,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable._ import org.apache.calcite.sql.fun.SqlTrimFunction import org.apache.calcite.util.BuiltInMethod import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.table.functions.utils.ScalarSqlFunction @@ -181,6 +181,48 @@ object ScalarFunctions { LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method) + addSqlFunction( + FLOOR, + Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])), + new FloorCeilCallGen( + BuiltInMethod.FLOOR.method, + Some(BuiltInMethod.UNIX_DATE_FLOOR.method))) + + addSqlFunction( + FLOOR, + Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])), + new FloorCeilCallGen( + BuiltInMethod.FLOOR.method, + Some(BuiltInMethod.UNIX_DATE_FLOOR.method))) + + addSqlFunction( + FLOOR, + Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])), + new FloorCeilCallGen( + BuiltInMethod.FLOOR.method, + Some(BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method))) + + addSqlFunction( + CEIL, + Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])), + new FloorCeilCallGen( + BuiltInMethod.CEIL.method, + Some(BuiltInMethod.UNIX_DATE_CEIL.method))) + + addSqlFunction( + CEIL, + Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])), + new FloorCeilCallGen( + BuiltInMethod.CEIL.method, + Some(BuiltInMethod.UNIX_DATE_CEIL.method))) + + addSqlFunction( + CEIL, + Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])), + new FloorCeilCallGen( + BuiltInMethod.CEIL.method, + Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method))) + // ---------------------------------------------------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index 1dd480b..c57d43b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.table.expressions.TimePointUnit.TimePointUnit import org.apache.flink.api.table.expressions.TrimMode.TrimMode import org.apache.flink.api.table.typeutils.IntervalTypeInfo +import scala.language.implicitConversions import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers} /** @@ -65,6 +66,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp") lazy val TRIM: Keyword = Keyword("trim") lazy val EXTRACT: Keyword = Keyword("extract") + lazy val FLOOR: Keyword = Keyword("floor") + lazy val CEIL: Keyword = Keyword("ceil") lazy val YEAR: Keyword = Keyword("year") lazy val MONTH: Keyword = Keyword("month") lazy val DAY: Keyword = Keyword("day") @@ -213,6 +216,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case operand ~ _ ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand) } + lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ { + case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand) + } + + lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ { + case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand) + } + lazy val suffixFunctionCall = composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ { case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args) @@ -255,7 +266,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime | - suffixExtract | suffixFunctionCall // function call must always be at the end + suffixExtract | suffixFloor | suffixCeil | + suffixFunctionCall // function call must always be at the end // prefix operators @@ -311,10 +323,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand) } + lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ { + case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand) + } + + lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ { + case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand) + } + lazy val prefixed: PackratParser[Expression] = prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract | - prefixFunctionCall // function call must always be at the end + prefixFloor | prefixCeil | prefixFunctionCall // function call must always be at the end // suffix/prefix composite http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala index a10f4d0..48b512c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.table.FlinkRelBuilder import org.apache.flink.api.table.expressions.ExpressionUtils.{divide, getFactor, mod} +import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCheckUtils} import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess} @@ -63,8 +64,8 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E ValidationSuccess case _ => - ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input " + - s"of type '${temporal.resultType}'.") + ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input" + + s" of type '${temporal.resultType}'.") } } @@ -131,7 +132,72 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E result = divide(rexBuilder, result, unit.multiplier) result } +} + +abstract class TemporalCeilFloor( + timeIntervalUnit: Expression, + temporal: Expression) + extends Expression { + + override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil + + override private[flink] def resultType: TypeInformation[_] = temporal.resultType + + override private[flink] def validateInput(): ExprValidationResult = { + if (!TypeCheckUtils.isTimePoint(temporal.resultType)) { + return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " + + s"but $temporal is of type ${temporal.resultType}") + } + val unit = timeIntervalUnit match { + case SymbolExpression(u: TimeIntervalUnit) => Some(u) + case _ => None + } + if (unit.isEmpty) { + return ValidationFailure(s"Temporal ceil/floor operator requires Time Interval Unit " + + s"input, but $timeIntervalUnit is of type ${timeIntervalUnit.resultType}") + } + + (unit.get, temporal.resultType) match { + case (TimeIntervalUnit.YEAR | TimeIntervalUnit.MONTH, + SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) => + ValidationSuccess + case (TimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) => + ValidationSuccess + case (TimeIntervalUnit.HOUR | TimeIntervalUnit.MINUTE | TimeIntervalUnit.SECOND, + SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) => + ValidationSuccess + case _ => + ValidationFailure(s"Temporal ceil/floor operator does not support " + + s"unit '$timeIntervalUnit' for input of type '${temporal.resultType}'.") + } + } +} + +case class TemporalFloor( + timeIntervalUnit: Expression, + temporal: Expression) + extends TemporalCeilFloor( + timeIntervalUnit, + temporal) { + + override def toString: String = s"($temporal).floor($timeIntervalUnit)" + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.FLOOR, temporal.toRexNode, timeIntervalUnit.toRexNode) + } } +case class TemporalCeil( + timeIntervalUnit: Expression, + temporal: Expression) + extends TemporalCeilFloor( + timeIntervalUnit, + temporal) { + + override def toString: String = s"($temporal).ceil($timeIntervalUnit)" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.CEIL, temporal.toRexNode, timeIntervalUnit.toRexNode) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala index 58b1f69..fb38dde 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala @@ -146,6 +146,9 @@ object FunctionCatalog { // temporal functions "extract" -> classOf[Extract] + // TODO implement function overloading here + // "floor" -> classOf[TemporalFloor] + // "ceil" -> classOf[TemporalCeil] ) /** http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala index 958bebe..7162a04 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala @@ -579,6 +579,129 @@ class ScalarFunctionsTest extends ExpressionTestBase { "2") } + @Test + def testTemporalFloor(): Unit = { + testAllApis( + 'f18.floor(TimeIntervalUnit.YEAR), + "f18.floor(YEAR)", + "FLOOR(f18 TO YEAR)", + "1996-01-01 00:00:00.0") + + testAllApis( + 'f18.floor(TimeIntervalUnit.MONTH), + "f18.floor(MONTH)", + "FLOOR(f18 TO MONTH)", + "1996-11-01 00:00:00.0") + + testAllApis( + 'f18.floor(TimeIntervalUnit.DAY), + "f18.floor(DAY)", + "FLOOR(f18 TO DAY)", + "1996-11-10 00:00:00.0") + + testAllApis( + 'f18.floor(TimeIntervalUnit.MINUTE), + "f18.floor(MINUTE)", + "FLOOR(f18 TO MINUTE)", + "1996-11-10 06:55:00.0") + + testAllApis( + 'f18.floor(TimeIntervalUnit.SECOND), + "f18.floor(SECOND)", + "FLOOR(f18 TO SECOND)", + "1996-11-10 06:55:44.0") + + testAllApis( + 'f17.floor(TimeIntervalUnit.HOUR), + "f17.floor(HOUR)", + "FLOOR(f17 TO HOUR)", + "06:00:00") + + testAllApis( + 'f17.floor(TimeIntervalUnit.MINUTE), + "f17.floor(MINUTE)", + "FLOOR(f17 TO MINUTE)", + "06:55:00") + + testAllApis( + 'f17.floor(TimeIntervalUnit.SECOND), + "f17.floor(SECOND)", + "FLOOR(f17 TO SECOND)", + "06:55:44") + + testAllApis( + 'f16.floor(TimeIntervalUnit.YEAR), + "f16.floor(YEAR)", + "FLOOR(f16 TO YEAR)", + "1996-01-01") + + testAllApis( + 'f16.floor(TimeIntervalUnit.MONTH), + "f16.floor(MONTH)", + "FLOOR(f16 TO MONTH)", + "1996-11-01") + + testAllApis( + 'f18.ceil(TimeIntervalUnit.YEAR), + "f18.ceil(YEAR)", + "CEIL(f18 TO YEAR)", + "1997-01-01 00:00:00.0") + + testAllApis( + 'f18.ceil(TimeIntervalUnit.MONTH), + "f18.ceil(MONTH)", + "CEIL(f18 TO MONTH)", + "1996-12-01 00:00:00.0") + + testAllApis( + 'f18.ceil(TimeIntervalUnit.DAY), + "f18.ceil(DAY)", + "CEIL(f18 TO DAY)", + "1996-11-11 00:00:00.0") + + testAllApis( + 'f18.ceil(TimeIntervalUnit.MINUTE), + "f18.ceil(MINUTE)", + "CEIL(f18 TO MINUTE)", + "1996-11-10 06:56:00.0") + + testAllApis( + 'f18.ceil(TimeIntervalUnit.SECOND), + "f18.ceil(SECOND)", + "CEIL(f18 TO SECOND)", + "1996-11-10 06:55:45.0") + + testAllApis( + 'f17.ceil(TimeIntervalUnit.HOUR), + "f17.ceil(HOUR)", + "CEIL(f17 TO HOUR)", + "07:00:00") + + testAllApis( + 'f17.ceil(TimeIntervalUnit.MINUTE), + "f17.ceil(MINUTE)", + "CEIL(f17 TO MINUTE)", + "06:56:00") + + testAllApis( + 'f17.ceil(TimeIntervalUnit.SECOND), + "f17.ceil(SECOND)", + "CEIL(f17 TO SECOND)", + "06:55:44") + + testAllApis( + 'f16.ceil(TimeIntervalUnit.YEAR), + "f16.ceil(YEAR)", + "CEIL(f16 TO YEAR)", + "1996-01-01") + + testAllApis( + 'f16.ceil(TimeIntervalUnit.MONTH), + "f16.ceil(MONTH)", + "CEIL(f16 TO MONTH)", + "1996-11-01") + } + // ---------------------------------------------------------------------------------------------- def testData = {
