twalthr closed pull request #4117: [FLINK-6813][table]Add TIMESTAMPDIFF
supported in SQL
URL: https://github.com/apache/flink/pull/4117
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index 1d8c926233b..5f144e6fc79 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -289,4 +289,14 @@ object CodeGenUtils {
case java.lang.Character.TYPE => s"$fieldTerm.setChar($objectTerm,
$valueTerm)"
case _ => s"$fieldTerm.set($objectTerm, $valueTerm)"
}
+
+ def absFloorRound(value: Double): Long = {
+
+ if (value >= 0) {
+ java.lang.Math.round(java.lang.Math.floor((value)))
+ } else {
+ 0 -
java.lang.Math.round(java.lang.Math.floor((java.lang.Math.abs(value))))
+ }
+
+ }
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 52a9dcd0ac6..0ec19aed6b3 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1353,7 +1353,7 @@ class CodeGenerator(
val right = operands(1)
requireTemporal(left)
requireTemporal(right)
- generateTemporalPlusMinus(plus = true, nullCheck, left, right)
+ generateTemporalPlusMinus(plus = true, nullCheck,
call.`type`.getSqlTypeName, left, right)
case MINUS if isNumeric(resultType) =>
val left = operands.head
@@ -1367,7 +1367,7 @@ class CodeGenerator(
val right = operands(1)
requireTemporal(left)
requireTemporal(right)
- generateTemporalPlusMinus(plus = false, nullCheck, left, right)
+ generateTemporalPlusMinus(plus = false, nullCheck,
call.`type`.getSqlTypeName, left, right)
case MULTIPLY if isNumeric(resultType) =>
val left = operands.head
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 1af4a3410a6..d23e778fcc1 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.codegen.calls
import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
+import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo._
@@ -702,12 +703,15 @@ object ScalarOperators {
def generateTemporalPlusMinus(
plus: Boolean,
nullCheck: Boolean,
+ typeName: SqlTypeName,
left: GeneratedExpression,
right: GeneratedExpression)
: GeneratedExpression = {
val op = if (plus) "+" else "-"
+ val AVGDAYS_PRE_MONTH = 30.5
+
(left.resultType, right.resultType) match {
case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r
=>
generateArithmeticOperator(op, nullCheck, l, left, right)
@@ -737,6 +741,46 @@ object ScalarOperators {
(l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l,
$op($r))"
}
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP) if !plus =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left,
right) {
+ (l, r) => typeName match {
+ case SqlTypeName.INTERVAL_YEAR | SqlTypeName.INTERVAL_MONTH =>
+ s"org.apache.flink.table.codegen.CodeGenUtils." +
+ s"absFloorRound(($l $op
$r)/${MILLIS_PER_DAY}L/${AVGDAYS_PRE_MONTH})"
+ case _ => s"$l $op $r"
+ }
+ }
+
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE) if !plus =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left,
right) {
+ (l, r) => typeName match {
+ case SqlTypeName.INTERVAL_YEAR | SqlTypeName.INTERVAL_MONTH =>
+ s"org.apache.flink.table.codegen.CodeGenUtils." +
+ s"absFloorRound(($l $op $r)/${AVGDAYS_PRE_MONTH})"
+ case _ => s"($l * ${MILLIS_PER_DAY}L) $op ($r *
${MILLIS_PER_DAY}L)"
+ }
+ }
+
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) if !plus =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left,
right) {
+ (l, r) => typeName match {
+ case SqlTypeName.INTERVAL_YEAR | SqlTypeName.INTERVAL_MONTH =>
+ s"org.apache.flink.table.codegen.CodeGenUtils." +
+ s"absFloorRound((($l/${MILLIS_PER_DAY}L) $op
$r)/${AVGDAYS_PRE_MONTH})"
+ case _ => s"$l $op ($r * ${MILLIS_PER_DAY}L)"
+ }
+ }
+
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) if !plus =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left,
right) {
+ (l, r) => typeName match {
+ case SqlTypeName.INTERVAL_YEAR | SqlTypeName.INTERVAL_MONTH =>
+ s"org.apache.flink.table.codegen.CodeGenUtils." +
+ s"absFloorRound(($l $op
($r/${MILLIS_PER_DAY}L))/${AVGDAYS_PRE_MONTH})"
+ case _ => s"($l * ${MILLIS_PER_DAY}L) $op $r"
+ }
+ }
+
case _ =>
throw new CodeGenException("Unsupported temporal arithmetic.")
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 4d945a7c66a..9026c9d6ad2 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -387,6 +387,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable
{
SqlStdOperatorTable.SIGN,
SqlStdOperatorTable.ROUND,
SqlStdOperatorTable.PI,
+ SqlStdOperatorTable.TIMESTAMP_DIFF,
// EXTENSIONS
SqlStdOperatorTable.TUMBLE,
SqlStdOperatorTable.TUMBLE_START,
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 76c02c0af50..a74d8c07fa9 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1465,6 +1465,127 @@ class ScalarFunctionsTest extends ExpressionTestBase {
"4")
}
+ @Test
+ def testTimestampDiff: Unit ={
+
+ val startTs = ("timestamp '2019-06-01 07:01:11'", "timestamp '2017-2-20
10:10:01.899'")
+ val endTs = ("timestamp '2020-06-01 07:01:11'", "timestamp '2018-5-24
18:26:53.999'")
+
+ testSqlApi(s"timestampDiff(YEAR, ${startTs._1}, ${endTs._1})", "1")
+ testSqlApi(s"timestampDiff(SQL_TSI_YEAR, ${startTs._2}, ${endTs._2})", "1")
+
+ testSqlApi(s"timestampDiff(QUARTER, ${startTs._1}, ${endTs._1})", "4")
+ testSqlApi(s"timestampDiff(SQL_TSI_QUARTER, ${startTs._2}, ${endTs._2})",
"5")
+
+ testSqlApi(s"timestampDiff(MONTH, ${startTs._1}, ${endTs._1})", "12")
+ testSqlApi(s"timestampDiff(SQL_TSI_MONTH, ${startTs._2}, ${endTs._2})",
"15")
+
+ testSqlApi(s"timestampDiff(WEEK, ${startTs._1}, ${endTs._1})", "52")
+ testSqlApi(s"timestampDiff(SQL_TSI_WEEK, ${startTs._2}, ${endTs._2})",
"65")
+
+ testSqlApi(s"timestampDiff(DAY, ${startTs._1}, ${endTs._1})", "366")
+ testSqlApi(s"timestampDiff(SQL_TSI_DAY, ${startTs._2}, ${endTs._2})",
"458")
+
+ testSqlApi(s"timestampDiff(HOUR, ${startTs._1}, ${endTs._1})", "8784")
+ testSqlApi(s"timestampDiff(SQL_TSI_HOUR, ${startTs._2}, ${endTs._2})",
"11000")
+
+ testSqlApi(s"timestampDiff(MINUTE, ${startTs._1}, ${endTs._1})", "527040")
+ testSqlApi(s"timestampDiff(SQL_TSI_MINUTE, ${startTs._2}, ${endTs._2})",
"660016")
+
+ testSqlApi(s"timestampDiff(SECOND, ${startTs._1}, ${endTs._1})",
"31622400")
+ testSqlApi(s"timestampDiff(SQL_TSI_SECOND, ${startTs._2}, ${endTs._2})",
"39601012")
+
+ testSqlApi(
+ "timestampDiff(YEAR, timestamp '2012-10-10 22:22:22', timestamp
'2013-10-09 22:22:22')","0")
+ testSqlApi(
+ "timestampDiff(YEAR, timestamp '2012-10-10 22:22:22', timestamp
'2013-10-11 22:22:22')","1")
+
+ testSqlApi(
+ "timestampDiff(QUARTER, timestamp '2012-9-10 22:22:22', timestamp
'2012-12-09 22:22:22')","0")
+ testSqlApi(
+ "timestampDiff(QUARTER, timestamp '2012-9-10 22:22:22', timestamp
'2012-12-11 22:22:22')","1")
+
+ testSqlApi(
+ "timestampDiff(MONTH, timestamp '2012-10-10 22:22:22', timestamp
'2012-11-09 22:22:22')","0")
+ testSqlApi(
+ "timestampDiff(MONTH, timestamp '2012-10-10 22:22:22', timestamp
'2012-11-11 22:22:22')","1")
+
+ testSqlApi(
+ "timestampDiff(DAY, timestamp '2012-10-10 22:22:22', timestamp
'2012-10-11 21:22:22')","0")
+ testSqlApi(
+ "timestampDiff(DAY, timestamp '2012-10-10 22:22:22', timestamp
'2012-10-11 22:22:22')","1")
+
+ testSqlApi(
+ "timestampDiff(MINUTE, timestamp '2012-10-10 22:22:22', timestamp
'2012-10-10 22:23:20')","0")
+ testSqlApi(
+ "timestampDiff(MINUTE, timestamp '2012-10-10 22:22:22', timestamp
'2012-10-10 22:23:23')","1")
+
+ }
+
+ @Test
+ def testTimestampDiffWithDateType: Unit ={
+
+ val startTs = ("date '2019-06-01'", "date '2017-2-20'")
+ val endTs = ("date '2020-06-01'", "date '2018-5-24'")
+
+ testSqlApi(s"timestampDiff(YEAR, ${startTs._1}, ${endTs._1})", "1")
+ testSqlApi(s"timestampDiff(SQL_TSI_YEAR, ${startTs._2}, ${endTs._2})", "1")
+
+ testSqlApi(s"timestampDiff(QUARTER, ${startTs._1}, ${endTs._1})", "4")
+ testSqlApi(s"timestampDiff(SQL_TSI_QUARTER, ${startTs._2}, ${endTs._2})",
"5")
+
+ testSqlApi(s"timestampDiff(MONTH, ${startTs._1}, ${endTs._1})", "12")
+ testSqlApi(s"timestampDiff(SQL_TSI_MONTH, ${startTs._2}, ${endTs._2})",
"15")
+
+ testSqlApi(s"timestampDiff(WEEK, ${startTs._1}, ${endTs._1})", "52")
+ testSqlApi(s"timestampDiff(SQL_TSI_WEEK, ${startTs._2}, ${endTs._2})",
"65")
+
+ testSqlApi(s"timestampDiff(DAY, ${startTs._1}, ${endTs._1})", "366")
+ testSqlApi(s"timestampDiff(SQL_TSI_DAY, ${startTs._2}, ${endTs._2})",
"458")
+
+ testSqlApi(s"timestampDiff(HOUR, ${startTs._1}, ${endTs._1})", "8784")
+ testSqlApi(s"timestampDiff(SQL_TSI_HOUR, ${startTs._2}, ${endTs._2})",
"10992")
+
+ testSqlApi(s"timestampDiff(MINUTE, ${startTs._1}, ${endTs._1})", "527040")
+ testSqlApi(s"timestampDiff(SQL_TSI_MINUTE, ${startTs._2}, ${endTs._2})",
"659520")
+
+ testSqlApi(s"timestampDiff(SECOND, ${startTs._1}, ${endTs._1})",
"31622400")
+ testSqlApi(s"timestampDiff(SQL_TSI_SECOND, ${startTs._2}, ${endTs._2})",
"39571200")
+
+ }
+
+ @Test
+ def testTimestampDiffWithMixedType: Unit ={
+
+ val startTs = ("date '2018-5-24'", "timestamp '2017-2-20 10:10:10.999'")
+ val endTs = ("timestamp '2017-2-20 10:10:10.999'", "date '2018-5-24'")
+
+ testSqlApi(s"timestampDiff(YEAR, ${startTs._1}, ${endTs._1})", "-1")
+ testSqlApi(s"timestampDiff(SQL_TSI_YEAR, ${startTs._2}, ${endTs._2})", "1")
+
+ testSqlApi(s"timestampDiff(QUARTER, ${startTs._1}, ${endTs._1})", "-5")
+ testSqlApi(s"timestampDiff(SQL_TSI_QUARTER, ${startTs._2}, ${endTs._2})",
"5")
+
+ testSqlApi(s"timestampDiff(MONTH, ${startTs._1}, ${endTs._1})", "-15")
+ testSqlApi(s"timestampDiff(SQL_TSI_MONTH, ${startTs._2}, ${endTs._2})",
"15")
+
+ testSqlApi(s"timestampDiff(WEEK, ${startTs._1}, ${endTs._1})", "-65")
+ testSqlApi(s"timestampDiff(SQL_TSI_WEEK, ${startTs._2}, ${endTs._2})",
"65")
+
+ testSqlApi(s"timestampDiff(DAY, ${startTs._1}, ${endTs._1})", "-457")
+ testSqlApi(s"timestampDiff(SQL_TSI_DAY, ${startTs._2}, ${endTs._2})",
"457")
+
+ testSqlApi(s"timestampDiff(HOUR, ${startTs._1}, ${endTs._1})", "-10981")
+ testSqlApi(s"timestampDiff(SQL_TSI_HOUR, ${startTs._2}, ${endTs._2})",
"10981")
+
+ testSqlApi(s"timestampDiff(MINUTE, ${startTs._1}, ${endTs._1})", "-658909")
+ testSqlApi(s"timestampDiff(SQL_TSI_MINUTE, ${startTs._2}, ${endTs._2})",
"658909")
+
+ testSqlApi(s"timestampDiff(SECOND, ${startTs._1}, ${endTs._1})",
"-39534589")
+ testSqlApi(s"timestampDiff(SQL_TSI_SECOND, ${startTs._2}, ${endTs._2})",
"39534589")
+
+ }
+
//
----------------------------------------------------------------------------------------------
// Other functions
//
----------------------------------------------------------------------------------------------
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services