Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6282#discussion_r204230539
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
    @@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
     
       override private[flink] def resultType = STRING_TYPE_INFO
     }
    +
    +case class TimestampDiff(
    +    timeIntervalUnit: Expression,
    +    timestamp1: Expression,
    +    timestamp2: Expression)
    +  extends Expression {
    +
    +  override private[flink] def children: Seq[Expression] =
    +    timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
    +      return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
    +        s"but timestamp1 is of type ${timestamp1.resultType}")
    +    }
    +
    +    if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
    +      return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
    +        s"but timestamp2 is of type ${timestamp2.resultType}")
    +    }
    +
    +    timeIntervalUnit match {
    +      case SymbolExpression(TimeIntervalUnit.YEAR)
    +           | SymbolExpression(TimeIntervalUnit.MONTH)
    +           | SymbolExpression(TimeIntervalUnit.DAY)
    +           | SymbolExpression(TimeIntervalUnit.HOUR)
    +           | SymbolExpression(TimeIntervalUnit.MINUTE)
    +           | SymbolExpression(TimeIntervalUnit.SECOND)
    +        if timestamp1.resultType == SqlTimeTypeInfo.DATE
    +          || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
    +          || timestamp2.resultType == SqlTimeTypeInfo.DATE
    +          || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
    +        ValidationSuccess
    +
    +      case _ =>
    +        ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
    +            s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
    +    }
    +  }
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
    +    val typeFactory = relBuilder
    +      .asInstanceOf[FlinkRelBuilder]
    +      .getTypeFactory
    +
    +    val intervalUnit = 
timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
    +      .enum.asInstanceOf[TimeUnitRange]
    +    val intervalType = typeFactory.createSqlIntervalType(
    +      new SqlIntervalQualifier(intervalUnit.startUnit, 
intervalUnit.endUnit, SqlParserPos.ZERO))
    +
    +    val rexCall = relBuilder
    +      .getRexBuilder
    +      .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
    +        List(timestamp2.toRexNode, timestamp1.toRexNode))
    +
    +    val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
    +
    +    relBuilder.getRexBuilder.makeCast(intType, rexCall)
    +  }
    +
    +  override def toString: String = s"timestampDiff(${children.mkString(", 
")})"
    +
    +  override private[flink] def resultType = INT_TYPE_INFO
    --- End diff --
    
    INT_TYPE_INFO => LONG_TYPE_INFO


---

Reply via email to