[
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521496#comment-16521496
]
ASF GitHub Bot commented on FLINK-6846:
---------------------------------------
Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/6188#discussion_r197640444
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
}
}
+ /**
+ * Standard conversion of the TIMESTAMPADD operator.
+ * Source:
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+ */
+case class TimestampAdd(
+ unit: Expression,
+ count: Expression,
+ timestamp: Expression)
+ extends Expression {
+
+ private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR",
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+ "SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE",
"SQL_TSI_SECOND")
+
+ override private[flink] def children: Seq[Expression] = unit :: count ::
timestamp :: Nil
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!TypeCheckUtils.isString(unit.resultType)) {
+ return ValidationFailure(s"TimestampAdd operator requires unit to be
of type " +
+ s"String Literal, but get ${unit.resultType}.")
+ }
+ if (!TypeCheckUtils.isTimePoint(timestamp.resultType)) {
+ return ValidationFailure(s"TimestampAdd operator requires timestamp
to be of type " +
+ s"SqlTimeTypeInfo, but get ${timestamp.resultType}.")
+ }
+ ValidationSuccess
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder):
RexNode = {
+ val timeUnit = unit match {
+ case Literal(value: String, STRING_TYPE_INFO) =>
+ if (sqlTsiArray.contains(value)) {
+ Some(TimeUnit.valueOf(value.split("_").last))
+ } else {
+ Some(TimeUnit.valueOf(value))
+ }
+ case _ => None
+ }
+
+ val interval = count match {
+ case Literal(value: Int, INT_TYPE_INFO) =>
+ makeInterval(value.toLong, timeUnit)
+ case Literal(value: Long, LONG_TYPE_INFO) =>
+ makeInterval(value, timeUnit)
+ case _ =>
+ relBuilder.call(SqlStdOperatorTable.MULTIPLY,
+
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
+ new SqlIntervalQualifier(timeUnit.get, null,
SqlParserPos.ZERO)),
+ count.toRexNode)
+ }
+
+ relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS,
timestamp.toRexNode, interval)
+ }
+
+ override def toString: String = s"timestampAdd(${children.mkString(",
")})"
+
+ override private[flink] def resultType: TypeInformation[_] =
STRING_TYPE_INFO
+
+ private[flink] def makeInterval(value: Long, timeUnit: Option[TimeUnit])
+ (implicit relBuilder: RelBuilder) = {
+ val countWithUnit =
timeUnit.get.multiplier.multiply(java.math.BigDecimal.valueOf(value))
+ relBuilder.getRexBuilder.makeIntervalLiteral(countWithUnit,
--- End diff --
indent with two spaces
> Add TIMESTAMPADD supported in TableAPI
> --------------------------------------
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: sunjincheng
> Assignee: sunjincheng
> Priority: Major
> Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)