Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2468#discussion_r79827070 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala --- @@ -248,3 +249,97 @@ case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = tr case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true) +case class TemporalOverlaps( + leftTimePoint: Expression, + leftTemporal: Expression, + rightTimePoint: Expression, + rightTemporal: Expression) + extends Expression { + + override private[flink] def children: Seq[Expression] = + Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) + + override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO + + override private[flink] def validateInput(): ExprValidationResult = { + if (!TypeCheckUtils.isTimePoint(leftTimePoint.resultType)) { + return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint to be of type " + + s"Time Point, but get ${leftTimePoint.resultType}.") + } + if (!TypeCheckUtils.isTimePoint(rightTimePoint.resultType)) { + return ValidationFailure(s"TemporalOverlaps operator requires rightTimePoint to be of " + + s"type Time Point, but get ${rightTimePoint.resultType}.") + } + if (leftTimePoint.resultType != rightTimePoint.resultType) { + return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint and " + + s"rightTimePoint to be of same type.") + } + + // leftTemporal is point, then it must be comparable with leftTimePoint + if (TypeCheckUtils.isTimePoint(leftTemporal.resultType)) { + if (leftTemporal.resultType != leftTimePoint.resultType) { + return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal and " + + s"leftTimePoint to be of same type if leftPointOrInterval is of type Time Point.") + } + } else if (!isTimeInterval(leftTemporal.resultType)) { + return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal to be of " + + s"type Time Point or Time Interval.") + } + + // rightTemporal is point, then it must be comparable with rightTimePoint + if (TypeCheckUtils.isTimePoint(rightTemporal.resultType)) { + if (rightTemporal.resultType != rightTimePoint.resultType) { + return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal and " + + s"rightTimePoint to be of same type if rightPointOrInterval is of type Time Point.") + } + } else if (!isTimeInterval(rightTemporal.resultType)) { + return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal to be of " + + s"type Time Point or Time Interval.") + } + ValidationSuccess + } + + override def toString: String = s"temporalOverlaps(${children.mkString(", ")})" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + convertOverlaps( + leftTimePoint.toRexNode, + leftTemporal.toRexNode, + rightTimePoint.toRexNode, + rightTemporal.toRexNode, + relBuilder.asInstanceOf[FlinkRelBuilder]) + } + + /** + * Standard conversion of the OVERLAPS operator. + * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]] + */ + private def convertOverlaps( + leftP: RexNode, + leftT: RexNode, + rightP: RexNode, + rightT: RexNode, + relBuilder: FlinkRelBuilder) + : RexNode = { + // t1 = t0 + t1 if t1 is an interval --- End diff -- improve comment. What are `t0`, `t1`, etc.?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---