This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0196a95c74cba8a9fb072385dbc7e5920d357d70 Author: Zhenghua Gao <doc...@gmail.com> AuthorDate: Thu Aug 8 18:01:23 2019 +0800 [FLINK-13547][table-planner-blink] Align the implementation of TRUNCATE() function with old planner --- .../planner/codegen/calls/BuiltInMethods.scala | 22 ++++++ .../planner/codegen/calls/FunctionGenerator.scala | 51 ++++++++++++ .../planner/expressions/ScalarFunctionsTest.scala | 92 ++++++++++++++++++++++ .../planner/expressions/SqlExpressionTest.scala | 2 + .../validation/ScalarFunctionsValidationTest.scala | 40 +++++++++- .../table/runtime/functions/SqlFunctionUtils.java | 31 ++++++++ 6 files changed, 237 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala index 11c6149..3307fc1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala @@ -436,4 +436,26 @@ object BuiltInMethods { val STRING_TO_TIME = Types.lookupMethod( classOf[SqlDateTimeUtils], "timeStringToUnixDate", classOf[String]) + + val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate", + classOf[Double]) + val TRUNCATE_FLOAT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", + classOf[Float]) + val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate", + classOf[Int]) + val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate", + classOf[Long]) + val TRUNCATE_DEC_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", + classOf[Decimal]) + + val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "struncate", + classOf[Double], classOf[Int]) + val TRUNCATE_FLOAT = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", + classOf[Float], classOf[Int]) + val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctions], "struncate", + classOf[Int], classOf[Int]) + val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctions], "struncate", + classOf[Long], classOf[Int]) + val TRUNCATE_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", + classOf[Decimal], classOf[Int]) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala index d1eb672..d3c34a7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala @@ -701,6 +701,57 @@ object FunctionGenerator { addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT) addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT) + addSqlFunctionMethod( + TRUNCATE, + Seq(BIGINT), + BuiltInMethods.TRUNCATE_LONG_ONE) + + addSqlFunctionMethod( + TRUNCATE, + Seq(INTEGER), + BuiltInMethods.TRUNCATE_INT_ONE) + + addSqlFunctionMethod( + TRUNCATE, + Seq(DECIMAL), + BuiltInMethods.TRUNCATE_DEC_ONE) + + addSqlFunctionMethod( + TRUNCATE, + Seq(DOUBLE), + BuiltInMethods.TRUNCATE_DOUBLE_ONE) + + addSqlFunctionMethod( + TRUNCATE, + Seq(FLOAT), + BuiltInMethods.TRUNCATE_FLOAT_ONE) + + addSqlFunctionMethod( + TRUNCATE, + Seq(BIGINT, INTEGER), + BuiltInMethods.TRUNCATE_LONG) + + addSqlFunctionMethod( + TRUNCATE, + Seq(INTEGER, INTEGER), + BuiltInMethods.TRUNCATE_INT) + + addSqlFunctionMethod( + TRUNCATE, + Seq(DECIMAL, INTEGER), + BuiltInMethods.TRUNCATE_DEC) + + addSqlFunctionMethod( + TRUNCATE, + Seq(DOUBLE, INTEGER), + BuiltInMethods.TRUNCATE_DOUBLE) + + addSqlFunctionMethod( + TRUNCATE, + Seq(FLOAT, INTEGER), + BuiltInMethods.TRUNCATE_FLOAT) + + // ---------------------------------------------------------------------------------------------- /** diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala index 56fa16d..9411a75 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala @@ -1395,6 +1395,98 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "-") } + @Test + def testTruncate(): Unit = { + testAllApis( + 'f29.truncate('f30), + "f29.truncate(f30)", + "truncate(f29, f30)", + "0.4") + + testAllApis( + 'f31.truncate('f7), + "f31.truncate(f7)", + "truncate(f31, f7)", + "-0.123") + + testAllApis( + 'f4.truncate('f32), + "f4.truncate(f32)", + "truncate(f4, f32)", + "40") + + testAllApis( + 'f28.cast(DataTypes.DOUBLE).truncate(1), + "f28.cast(DOUBLE).truncate(1)", + "truncate(cast(f28 as DOUBLE), 1)", + "0.4") + + // TODO: ignore TableApiTest for cast to DECIMAL(p, s) is not support now. + // see https://issues.apache.org/jira/browse/FLINK-13651 +// testAllApis( +// 'f31.cast(DataTypes.DECIMAL(38, 18)).truncate(2), +// "f31.cast(DECIMAL(10, 10)).truncate(2)", +// "truncate(cast(f31 as decimal(38, 18)), 2)", +// "-0.12") +// +// testAllApis( +// 'f36.cast(DataTypes.DECIMAL(38, 18)).truncate(), +// "f36.cast(DECIMAL(10, 10)).truncate()", +// "truncate(42.324)", +// "42") + + testSqlApi("truncate(cast(f31 as decimal(38, 18)), 2)", "-0.12") + + testAllApis( + 'f5.cast(DataTypes.FLOAT).truncate(), + "f5.cast(FLOAT).truncate()", + "truncate(cast(f5 as float))", + "4.0") + + testAllApis( + 42.truncate(-1), + "42.truncate(-1)", + "truncate(42, -1)", + "40") + + testAllApis( + 42.truncate(-3), + "42.truncate(-3)", + "truncate(42, -3)", + "0") + + // The validation parameter is null + testAllApis( + 'f33.cast(DataTypes.INT).truncate(1), + "f33.cast(INT).truncate(1)", + "truncate(cast(null as integer), 1)", + "null") + + testAllApis( + 43.21.truncate('f33.cast(DataTypes.INT)), + "43.21.truncate(f33.cast(INT))", + "truncate(43.21, cast(null as integer))", + "null") + + testAllApis( + 'f33.cast(DataTypes.DOUBLE).truncate(1), + "f33.cast(DOUBLE).truncate(1)", + "truncate(cast(null as double), 1)", + "null") + + testAllApis( + 'f33.cast(DataTypes.INT).truncate(1), + "f33.cast(INT).truncate(1)", + "truncate(cast(null as integer))", + "null") + + testAllApis( + 'f33.cast(DataTypes.DOUBLE).truncate(), + "f33.cast(DOUBLE).truncate()", + "truncate(cast(null as double))", + "null") + } + // ---------------------------------------------------------------------------------------------- // Math functions // ---------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala index 35a60c1..d31c937 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala @@ -119,6 +119,8 @@ class SqlExpressionTest extends ExpressionTestBase { testSqlApi("ROUND(-12.345, 2)", "-12.35") testSqlApi("PI()", "3.141592653589793") testSqlApi("E()", "2.718281828459045") + testSqlApi("truncate(42.345)", "42") + testSqlApi("truncate(cast(42.345 as decimal(5, 3)), 2)", "42.34") } @Test diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala index da4763f..8299a12 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala @@ -21,8 +21,8 @@ package org.apache.flink.table.planner.expressions.validation import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{SqlParserException, ValidationException} import org.apache.flink.table.expressions.TimePointUnit +import org.apache.flink.table.planner.codegen.CodeGenException import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase - import org.apache.calcite.avatica.util.TimeUnit import org.junit.{Ignore, Test} @@ -69,6 +69,44 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase { testSqlApi("BIN(f16)", "101010") // Date type } + + @Test(expected = classOf[ValidationException]) + def testInvalidTruncate1(): Unit = { + // All arguments are string type + testSqlApi( + "TRUNCATE('abc', 'def')", + "FAIL") + + // The second argument is of type String + testSqlApi( + "TRUNCATE(f12, f0)", + "FAIL") + + // The second argument is of type Float + testSqlApi( + "TRUNCATE(f12,f12)", + "FAIL") + + // The second argument is of type Double + testSqlApi( + "TRUNCATE(f12, cast(f28 as DOUBLE))", + "FAIL") + + // The second argument is of type BigDecimal + testSqlApi( + "TRUNCATE(f12,f15)", + "FAIL") + } + + @Test + def testInvalidTruncate2(): Unit = { + thrown.expect(classOf[CodeGenException]) + // The one argument is of type String + testSqlApi( + "TRUNCATE('abc')", + "FAIL") + } + // ---------------------------------------------------------------------------------------------- // String functions // ---------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java index f904fd5..9a993a5 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java @@ -1074,4 +1074,35 @@ public class SqlFunctionUtils { public static String uuid(byte[] b){ return UUID.nameUUIDFromBytes(b).toString(); } + + /** SQL <code>TRUNCATE</code> operator applied to BigDecimal values. */ + public static Decimal struncate(Decimal b0) { + return struncate(b0, 0); + } + + public static Decimal struncate(Decimal b0, int b1) { + if (b1 >= b0.getScale()) { + return b0; + } + + BigDecimal b2 = b0.toBigDecimal().movePointRight(b1) + .setScale(0, RoundingMode.DOWN).movePointLeft(b1); + int p = b0.getPrecision(); + int s = b0.getScale(); + + if (b1 < 0) { + return Decimal.fromBigDecimal(b2, Math.min(38, 1 + p - s), 0); + } else { + return Decimal.fromBigDecimal(b2, 1 + p - s + b1, b1); + } + } + + /** SQL <code>TRUNCATE</code> operator applied to double values. */ + public static float struncate(float b0) { + return struncate(b0, 0); + } + + public static float struncate(float b0, int b1) { + return (float) struncate(Decimal.castFrom((double) b0, 38, 18), b1).doubleValue(); + } }