Repository: flink Updated Branches: refs/heads/master 3507d59f9 -> e6fbda906
[FLINK-3580] [table] Add OVERLAPS function This closes #2468. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6fbda90 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6fbda90 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6fbda90 Branch: refs/heads/master Commit: e6fbda906a173660df306e78eee010ed3fc59d8e Parents: 3507d59 Author: twalthr <twal...@apache.org> Authored: Sat Sep 3 08:00:58 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Thu Sep 22 14:02:30 2016 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 22 +++ .../flink/api/scala/table/expressionDsl.scala | 28 ++++ .../flink/api/table/expressions/time.scala | 98 ++++++++++++ .../api/table/validate/FunctionCatalog.scala | 3 +- .../table/expressions/ScalarFunctionsTest.scala | 150 ++++++++++++------- 5 files changed, 249 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index b88a7da..72b88a6 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1623,6 +1623,17 @@ localTimestamp() </td> </tr> + <tr> + <td> + {% highlight java %} +temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL) +{% endhighlight %} + </td> + <td> + <p>Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>. E.g. <code>temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour)</code> leads to true.</p> + </td> + </tr> + </tbody> </table> @@ -2030,6 +2041,17 @@ localTimestamp() </td> </tr> + <tr> + <td> + {% highlight scala %} +temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL) +{% endhighlight %} + </td> + <td> + <p>Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>. E.g. <code>temporalOverlaps('2:55:00'.toTime, 1.hour, '3:30:00'.toTime, 2.hour)</code> leads to true.</p> + </td> + </tr> + </tbody> </table> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 003b8b2..9c2721b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -508,5 +508,33 @@ object localTimestamp { } } +/** + * Determines whether two anchored time intervals overlap. Time point and temporal are + * transformed into a range defined by two time points (start, end). The function + * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>. + * + * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart + * + * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true + */ +object temporalOverlaps { + + /** + * Determines whether two anchored time intervals overlap. Time point and temporal are + * transformed into a range defined by two time points (start, end). + * + * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart + * + * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true + */ + def apply( + leftTimePoint: Expression, + leftTemporal: Expression, + rightTimePoint: Expression, + rightTemporal: Expression): Expression = { + TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala index 4b1942e..1f6361e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala @@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.table.FlinkRelBuilder import org.apache.flink.api.table.expressions.ExpressionUtils.{divide, getFactor, mod} import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCheckUtils} import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess} @@ -277,3 +278,100 @@ case class Quarter(child: Expression) extends UnaryExpression with InputTypeSpec } } +/** + * Determines whether two anchored time intervals overlap. + */ +case class TemporalOverlaps( + leftTimePoint: Expression, + leftTemporal: Expression, + rightTimePoint: Expression, + rightTemporal: Expression) + extends Expression { + + override private[flink] def children: Seq[Expression] = + Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) + + override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO + + override private[flink] def validateInput(): ExprValidationResult = { + if (!TypeCheckUtils.isTimePoint(leftTimePoint.resultType)) { + return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint to be of type " + + s"Time Point, but get ${leftTimePoint.resultType}.") + } + if (!TypeCheckUtils.isTimePoint(rightTimePoint.resultType)) { + return ValidationFailure(s"TemporalOverlaps operator requires rightTimePoint to be of " + + s"type Time Point, but get ${rightTimePoint.resultType}.") + } + if (leftTimePoint.resultType != rightTimePoint.resultType) { + return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint and " + + s"rightTimePoint to be of same type.") + } + + // leftTemporal is point, then it must be comparable with leftTimePoint + if (TypeCheckUtils.isTimePoint(leftTemporal.resultType)) { + if (leftTemporal.resultType != leftTimePoint.resultType) { + return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal and " + + s"leftTimePoint to be of same type if leftTemporal is of type Time Point.") + } + } else if (!isTimeInterval(leftTemporal.resultType)) { + return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal to be of " + + s"type Time Point or Time Interval.") + } + + // rightTemporal is point, then it must be comparable with rightTimePoint + if (TypeCheckUtils.isTimePoint(rightTemporal.resultType)) { + if (rightTemporal.resultType != rightTimePoint.resultType) { + return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal and " + + s"rightTimePoint to be of same type if rightTemporal is of type Time Point.") + } + } else if (!isTimeInterval(rightTemporal.resultType)) { + return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal to be of " + + s"type Time Point or Time Interval.") + } + ValidationSuccess + } + + override def toString: String = s"temporalOverlaps(${children.mkString(", ")})" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + convertOverlaps( + leftTimePoint.toRexNode, + leftTemporal.toRexNode, + rightTimePoint.toRexNode, + rightTemporal.toRexNode, + relBuilder.asInstanceOf[FlinkRelBuilder]) + } + + /** + * Standard conversion of the OVERLAPS operator. + * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]] + */ + private def convertOverlaps( + leftP: RexNode, + leftT: RexNode, + rightP: RexNode, + rightT: RexNode, + relBuilder: FlinkRelBuilder) + : RexNode = { + // leftT = leftP + leftT if leftT is an interval + val convLeftT = if (isTimeInterval(leftTemporal.resultType)) { + relBuilder.call(SqlStdOperatorTable.PLUS, leftP, leftT) + } else { + leftT + } + // rightT = rightP + rightT if rightT is an interval + val convRightT = if (isTimeInterval(rightTemporal.resultType)) { + relBuilder.call(SqlStdOperatorTable.PLUS, rightP, rightT) + } else { + rightT + } + // leftT >= rightP + val leftPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, convLeftT, rightP) + // rightT >= leftP + val rightPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, convRightT, leftP) + + // leftT >= rightP and rightT >= leftP + relBuilder.call(SqlStdOperatorTable.AND, leftPred, rightPred) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala index e8a2971..42d460e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala @@ -160,7 +160,8 @@ object FunctionCatalog { "currentTimestamp" -> classOf[CurrentTimestamp], "localTime" -> classOf[LocalTime], "localTimestamp" -> classOf[LocalTimestamp], - "quarter" -> classOf[Quarter] + "quarter" -> classOf[Quarter], + "temporalOverlaps" -> classOf[TemporalOverlaps] // TODO implement function overloading here // "floor" -> classOf[TemporalFloor] http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala index 672b876..5506793 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala @@ -230,6 +230,10 @@ class ScalarFunctionsTest extends ExpressionTestBase { "false") } + // ---------------------------------------------------------------------------------------------- + // Math functions + // ---------------------------------------------------------------------------------------------- + @Test def testMod(): Unit = { testAllApis( @@ -513,6 +517,10 @@ class ScalarFunctionsTest extends ExpressionTestBase { "-1231") } + // ---------------------------------------------------------------------------------------------- + // Temporal functions + // ---------------------------------------------------------------------------------------------- + @Test def testExtract(): Unit = { testAllApis( @@ -748,57 +756,6 @@ class ScalarFunctionsTest extends ExpressionTestBase { } @Test - def testIsTrueIsFalse(): Unit = { - testAllApis( - 'f1.isTrue, - "f1.isTrue", - "f1 IS TRUE", - "true") - - testAllApis( - 'f21.isTrue, - "f21.isTrue", - "f21 IS TRUE", - "false") - - testAllApis( - false.isFalse, - "false.isFalse", - "FALSE IS FALSE", - "true") - - testAllApis( - 'f21.isFalse, - "f21.isFalse", - "f21 IS FALSE", - "false") - - testAllApis( - !'f1.isTrue, - "!f1.isTrue", - "f1 IS NOT TRUE", - "false") - - testAllApis( - !'f21.isTrue, - "!f21.isTrue", - "f21 IS NOT TRUE", - "true") - - testAllApis( - !false.isFalse, - "!false.isFalse", - "FALSE IS NOT FALSE", - "false") - - testAllApis( - !'f21.isFalse, - "!f21.isFalse", - "f21 IS NOT FALSE", - "true") - } - - @Test def testCurrentTimePoint(): Unit = { // current time points are non-deterministic @@ -844,6 +801,42 @@ class ScalarFunctionsTest extends ExpressionTestBase { } @Test + def testOverlaps(): Unit = { + testAllApis( + temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour), + "temporalOverlaps('2:55:00'.toTime, 1.hour, '3:30:00'.toTime, 2.hour)", + "(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR)", + "true") + + testAllApis( + temporalOverlaps("9:00:00".toTime, "9:30:00".toTime, "9:29:00".toTime, "9:31:00".toTime), + "temporalOverlaps('9:00:00'.toTime, '9:30:00'.toTime, '9:29:00'.toTime, '9:31:00'.toTime)", + "(TIME '9:00:00', TIME '9:30:00') OVERLAPS (TIME '9:29:00', TIME '9:31:00')", + "true") + + testAllApis( + temporalOverlaps("9:00:00".toTime, "10:00:00".toTime, "10:15:00".toTime, 3.hour), + "temporalOverlaps('9:00:00'.toTime, '10:00:00'.toTime, '10:15:00'.toTime, 3.hour)", + "(TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR)", + "false") + + testAllApis( + temporalOverlaps("2011-03-10".toDate, 10.day, "2011-03-19".toDate, 10.day), + "temporalOverlaps('2011-03-10'.toDate, 10.day, '2011-03-19'.toDate, 10.day)", + "(DATE '2011-03-10', INTERVAL '10' DAY) OVERLAPS (DATE '2011-03-19', INTERVAL '10' DAY)", + "true") + + testAllApis( + temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli, + "2011-03-10 02:02:02.002".toTimestamp, "2011-03-10 02:02:02.002".toTimestamp), + "temporalOverlaps('2011-03-10 02:02:02.001'.toTimestamp, 0.milli, " + + "'2011-03-10 02:02:02.002'.toTimestamp, '2011-03-10 02:02:02.002'.toTimestamp)", + "(TIMESTAMP '2011-03-10 02:02:02.001', INTERVAL '0' SECOND) OVERLAPS " + + "(TIMESTAMP '2011-03-10 02:02:02.002', TIMESTAMP '2011-03-10 02:02:02.002')", + "false") + } + + @Test def testQuarter(): Unit = { testAllApis( "1997-01-27".toDate.quarter(), @@ -865,6 +858,61 @@ class ScalarFunctionsTest extends ExpressionTestBase { } // ---------------------------------------------------------------------------------------------- + // Other functions + // ---------------------------------------------------------------------------------------------- + + @Test + def testIsTrueIsFalse(): Unit = { + testAllApis( + 'f1.isTrue, + "f1.isTrue", + "f1 IS TRUE", + "true") + + testAllApis( + 'f21.isTrue, + "f21.isTrue", + "f21 IS TRUE", + "false") + + testAllApis( + false.isFalse, + "false.isFalse", + "FALSE IS FALSE", + "true") + + testAllApis( + 'f21.isFalse, + "f21.isFalse", + "f21 IS FALSE", + "false") + + testAllApis( + !'f1.isTrue, + "!f1.isTrue", + "f1 IS NOT TRUE", + "false") + + testAllApis( + !'f21.isTrue, + "!f21.isTrue", + "f21 IS NOT TRUE", + "true") + + testAllApis( + !false.isFalse, + "!false.isFalse", + "FALSE IS NOT FALSE", + "false") + + testAllApis( + !'f21.isFalse, + "!f21.isFalse", + "f21 IS NOT FALSE", + "true") + } + + // ---------------------------------------------------------------------------------------------- def testData = { val testData = new Row(22)