[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521496#comment-16521496
 ] 

ASF GitHub Bot commented on FLINK-6846:
---------------------------------------

Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6188#discussion_r197640444
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
    @@ -328,6 +330,71 @@ case class TemporalOverlaps(
       }
     }
     
    +  /**
    +    * Standard conversion of the TIMESTAMPADD operator.
    +    * Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
    +    */
    +case class TimestampAdd(
    +    unit: Expression,
    +    count: Expression,
    +    timestamp: Expression)
    +  extends Expression {
    +
    +  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
    +    "SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
    +
    +  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    if (!TypeCheckUtils.isString(unit.resultType)) {
    +      return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
    +        s"String Literal, but get ${unit.resultType}.")
    +    }
    +    if (!TypeCheckUtils.isTimePoint(timestamp.resultType)) {
    +      return ValidationFailure(s"TimestampAdd operator requires timestamp 
to be of type " +
    +        s"SqlTimeTypeInfo, but get ${timestamp.resultType}.")
    +    }
    +    ValidationSuccess
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
    +    val timeUnit = unit match {
    +      case Literal(value: String, STRING_TYPE_INFO) =>
    +        if (sqlTsiArray.contains(value)) {
    +          Some(TimeUnit.valueOf(value.split("_").last))
    +        } else {
    +          Some(TimeUnit.valueOf(value))
    +        }
    +      case _ => None
    +    }
    +
    +    val interval = count match {
    +      case Literal(value: Int, INT_TYPE_INFO) =>
    +        makeInterval(value.toLong, timeUnit)
    +      case Literal(value: Long, LONG_TYPE_INFO) =>
    +        makeInterval(value, timeUnit)
    +      case _ =>
    +        relBuilder.call(SqlStdOperatorTable.MULTIPLY,
    +          
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
    +            new SqlIntervalQualifier(timeUnit.get, null, 
SqlParserPos.ZERO)),
    +          count.toRexNode)
    +    }
    +
    +    relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, 
timestamp.toRexNode, interval)
    +  }
    +
    +  override def toString: String = s"timestampAdd(${children.mkString(", 
")})"
    +
    +  override private[flink] def resultType: TypeInformation[_] = 
STRING_TYPE_INFO
    +
    +  private[flink] def makeInterval(value: Long, timeUnit: Option[TimeUnit])
    +    (implicit relBuilder: RelBuilder) = {
    +    val countWithUnit = 
timeUnit.get.multiplier.multiply(java.math.BigDecimal.valueOf(value))
    +      relBuilder.getRexBuilder.makeIntervalLiteral(countWithUnit,
    --- End diff --
    
    indent with two spaces


> Add TIMESTAMPADD supported in TableAPI
> --------------------------------------
>
>                 Key: FLINK-6846
>                 URL: https://issues.apache.org/jira/browse/FLINK-6846
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>            Priority: Major
>              Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to