This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new c72db29 [FLINK-15987][table-planner-blink] SELECT 1.0e0 / 0.0e0
throws NumberFormatException
c72db29 is described below
commit c72db290b7c8e0266f5df43998a39fb4fe1ac62a
Author: xuyang <[email protected]>
AuthorDate: Fri Oct 22 16:13:06 2021 +0800
[FLINK-15987][table-planner-blink] SELECT 1.0e0 / 0.0e0 throws
NumberFormatException
This closes #17546
---
.../table/planner/codegen/ExpressionReducer.scala | 73 ++++++++++++++++------
.../planner/expressions/ScalarFunctionsTest.scala | 8 +++
.../planner/expressions/SqlExpressionTest.scala | 23 ++++++-
.../planner/expressions/TemporalTypesTest.scala | 54 +++++++++-------
.../expressions/utils/ExpressionTestBase.scala | 18 +++---
.../validation/ScalarFunctionsValidationTest.scala | 16 -----
6 files changed, 123 insertions(+), 69 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 166011f..02239c5 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -32,7 +32,8 @@ import org.apache.flink.table.types.logical.RowType
import org.apache.flink.table.util.TimestampStringUtils.fromLocalDateTime
import org.apache.calcite.avatica.util.ByteString
-import org.apache.calcite.rex.{RexBuilder, RexExecutor, RexNode}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexExecutor, RexLiteral,
RexNode, RexUtil}
+import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.`type`.SqlTypeName
import scala.collection.JavaConverters._
@@ -61,25 +62,7 @@ class ExpressionReducer(
val pythonUDFExprs = new ListBuffer[RexNode]()
- val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName,
e)).flatMap {
-
- // Skip expressions that contain python functions because it's quite
expensive to
- // call Python UDFs during optimization phase. They will be optimized
during the runtime.
- case (_, e) if containsPythonCall(e) =>
- pythonUDFExprs += e
- None
-
- // we don't support object literals yet, we skip those constant
expressions
- case (SqlTypeName.ANY, _) |
- (SqlTypeName.OTHER, _) |
- (SqlTypeName.ROW, _) |
- (SqlTypeName.STRUCTURED, _) |
- (SqlTypeName.ARRAY, _) |
- (SqlTypeName.MAP, _) |
- (SqlTypeName.MULTISET, _) => None
-
- case (_, e) => Some(e)
- }
+ val literals = skipAndValidateExprs(rexBuilder, constExprs, pythonUDFExprs)
val literalTypes = literals.map(e =>
FlinkTypeFactory.toLogicalType(e.getType))
val resultType = RowType.of(literalTypes: _*)
@@ -212,6 +195,56 @@ class ExpressionReducer(
}
}
+ /**
+ * skip the expressions that can't be reduced now
+ * and validate the expressions
+ */
+ private def skipAndValidateExprs(
+ rexBuilder: RexBuilder,
+ constExprs: java.util.List[RexNode],
+ pythonUDFExprs: ListBuffer[RexNode]): List[RexNode] ={
+
+ constExprs.asScala.map(e => (e.getType.getSqlTypeName, e)).flatMap {
+
+ // Skip expressions that contain python functions because it's quite
expensive to
+ // call Python UDFs during optimization phase. They will be optimized
during the runtime.
+ case (_, e) if containsPythonCall(e) =>
+ pythonUDFExprs += e
+ None
+
+ // we don't support object literals yet, we skip those constant
expressions
+ case (SqlTypeName.ANY, _) |
+ (SqlTypeName.OTHER, _) |
+ (SqlTypeName.ROW, _) |
+ (SqlTypeName.STRUCTURED, _) |
+ (SqlTypeName.ARRAY, _) |
+ (SqlTypeName.MAP, _) |
+ (SqlTypeName.MULTISET, _) => None
+
+ case (_, call: RexCall) => {
+ // to ensure the division is non-zero when the operator is DIVIDE
+ if (call.getOperator.getKind.equals(SqlKind.DIVIDE)) {
+ val ops = call.getOperands
+ val divisionLiteral = ops.get(ops.size() - 1)
+
+ // according to BuiltInFunctionDefinitions, the DEVIDE's second op
must be numeric
+ assert(RexUtil.isDeterministic(divisionLiteral))
+ val divisionComparable =
+
divisionLiteral.asInstanceOf[RexLiteral].getValue.asInstanceOf[Comparable[Any]]
+ val zeroComparable = rexBuilder.makeExactLiteral(
+ new java.math.BigDecimal(0))
+ .getValue.asInstanceOf[Comparable[Any]]
+ if (divisionComparable.compareTo(zeroComparable) == 0) {
+ throw new ArithmeticException("Division by zero")
+ }
+ }
+ Some(call)
+ }
+
+ case (_, e) => Some(e)
+ }.toList
+ }
+
// We may skip the reduce if the original constant is invalid and casted as
a null literal,
// cause now this may change the RexNode's and it's parent node's
nullability.
def maySkipNullLiteralReduce(
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 41514e5..f036cfe 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -2506,6 +2506,14 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"LOG(cast (null AS DOUBLE), cast (null AS DOUBLE))",
"null"
)
+
+ // invalid log
+ val infiniteOrNaNException = "Infinite or NaN"
+ // Infinity
+ testExpectedSqlException("LOG(1, 100)", infiniteOrNaNException,
classOf[NumberFormatException])
+ // NaN
+ testExpectedSqlException("LOG(-1)", infiniteOrNaNException,
classOf[NumberFormatException])
+
}
@Test
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 721ad72..259cfcd 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -124,8 +124,8 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi("ROUND(-12.345, 2)", "-12.35")
testSqlApi("PI()", "3.141592653589793")
testSqlApi("E()", "2.718281828459045")
- testSqlApi("truncate(42.345)", "42")
- testSqlApi("truncate(cast(42.345 as decimal(5, 3)), 2)", "42.34")
+ testSqlApi("truncate(42.345)", "42.000")
+ testSqlApi("truncate(cast(42.345 as decimal(5, 3)), 2)", "42.340")
}
@Test
@@ -144,6 +144,25 @@ class SqlExpressionTest extends ExpressionTestBase {
// Decimal(2,1) / Decimal(10,0) => Decimal(23,12)
testSqlApi("2.0/(-3)", "-0.666666666667")
testSqlApi("-7.9/2", "-3.950000000000")
+
+
+ // invalid division
+ val divisorZeroException = "Division by zero"
+ testExpectedSqlException(
+ "1/cast(0.00 as decimal)", divisorZeroException,
classOf[ArithmeticException])
+ testExpectedSqlException(
+ "1/cast(0.00 as double)", divisorZeroException,
classOf[ArithmeticException])
+ testExpectedSqlException(
+ "1/cast(0.00 as float)", divisorZeroException,
classOf[ArithmeticException])
+ testExpectedSqlException(
+ "1/cast(0 as tinyint)", divisorZeroException,
classOf[ArithmeticException])
+ testExpectedSqlException(
+ "1/cast(0 as smallint)", divisorZeroException,
classOf[ArithmeticException])
+ testExpectedSqlException(
+ "1/0", divisorZeroException, classOf[ArithmeticException])
+ testExpectedSqlException(
+ "1/cast(0 as bigint)", divisorZeroException,
classOf[ArithmeticException])
+
}
@Test
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index f854bfe..3eb66cf 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -18,12 +18,6 @@
package org.apache.flink.table.planner.expressions
-import java.sql.Timestamp
-import java.text.SimpleDateFormat
-import java.time.{Instant, ZoneId, ZoneOffset}
-import java.util.{Locale, TimeZone}
-import java.lang.{Double => JDouble, Float => JFloat, Integer => JInt, Long =>
JLong}
-
import org.apache.flink.table.api._
import org.apache.flink.table.expressions.TimeIntervalUnit
import org.apache.flink.table.planner.codegen.CodeGenException
@@ -32,8 +26,15 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil
import org.apache.flink.table.planner.utils.DateTimeTestUtil._
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
+
import org.junit.Test
+import java.lang.{Double => JDouble, Float => JFloat, Integer => JInt, Long =>
JLong}
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.time.{Instant, ZoneId, ZoneOffset}
+import java.util.Locale
+
class TemporalTypesTest extends ExpressionTestBase {
@Test
@@ -1116,6 +1117,30 @@ class TemporalTypesTest extends ExpressionTestBase {
"1437699600")
}
+ /**
+ * now Flink only support TIMESTAMP(3) as the return type in TO_TIMESTAMP
+ * See: https://issues.apache.org/jira/browse/FLINK-14925
+ */
+ @Test
+ def testToTimeStampFunctionWithHighPrecision(): Unit = {
+ testSqlApi(
+ "TO_TIMESTAMP('1970-01-01 00:00:00.123456789')",
+ "1970-01-01 00:00:00.123")
+
+ testSqlApi(
+ "TO_TIMESTAMP('1970-01-01 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS')",
+ "1970-01-01 00:00:00.123")
+
+ testSqlApi(
+ "TO_TIMESTAMP('20000202 59:59.1234567', 'yyyyMMdd mm:ss.SSSSSSS')",
+ "2000-02-02 00:59:59.123")
+
+ testSqlApi(
+ "TO_TIMESTAMP('1234567', 'SSSSSSS')",
+ "1970-01-01 00:00:00.123")
+ }
+
+
@Test
def testHighPrecisionTimestamp(): Unit = {
// EXTRACT should support millisecond/microsecond/nanosecond
@@ -1167,15 +1192,6 @@ class TemporalTypesTest extends ExpressionTestBase {
// "TIMESTAMP '1970-01-01 00:00:00.123455789')",
// "1")
- // TO_TIMESTAMP should support up to nanosecond
- testSqlApi(
- "TO_TIMESTAMP('1970-01-01 00:00:00.123456789')",
- "1970-01-01 00:00:00.123456789")
-
- testSqlApi(
- "TO_TIMESTAMP('1970-01-01 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS')",
- "1970-01-01 00:00:00.12345")
-
testSqlApi("TO_TIMESTAMP('abc')", "null")
// TO_TIMESTAMP should complement
YEAR/MONTH/DAY/HOUR/MINUTE/SECOND/NANO_OF_SECOND
@@ -1183,14 +1199,6 @@ class TemporalTypesTest extends ExpressionTestBase {
"TO_TIMESTAMP('2000020210', 'yyyyMMddHH')",
"2000-02-02 10:00:00.000")
- testSqlApi(
- "TO_TIMESTAMP('20000202 59:59.1234567', 'yyyyMMdd mm:ss.SSSSSSS')",
- "2000-02-02 00:59:59.1234567")
-
- testSqlApi(
- "TO_TIMESTAMP('1234567', 'SSSSSSS')",
- "1970-01-01 00:00:00.1234567")
-
// CAST between two TIMESTAMPs
testSqlApi(
"CAST(TIMESTAMP '1970-01-01 00:00:00.123456789' AS TIMESTAMP(6))",
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 567e963..fb25410 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -18,14 +18,7 @@
package org.apache.flink.table.planner.expressions.utils
-import java.util.Collections
-import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.logical.LogicalCalc
-import org.apache.calcite.rel.rules._
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-import org.apache.flink.api.common.{JobID, TaskInfo}
+import org.apache.flink.api.common.TaskInfo
import org.apache.flink.api.common.functions.util.RuntimeUDFContext
import org.apache.flink.api.common.functions.{MapFunction, RichFunction,
RichMapFunction}
import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -49,10 +42,18 @@ import org.apache.flink.table.types.logical.{RowType,
VarCharType}
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.types.Row
+import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
import org.junit.Assert.{assertEquals, assertTrue, fail}
import org.junit.rules.ExpectedException
import org.junit.{After, Before, Rule}
+import java.util.Collections
+
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
@@ -303,6 +304,7 @@ abstract class ExpressionTestBase {
exceptionClass: Class[_ <: Throwable],
exprs: mutable.ArrayBuffer[_]): Unit = {
val builder = new HepProgramBuilder()
+ builder.addRuleInstance(CoreRules.PROJECT_REDUCE_EXPRESSIONS)
builder.addRuleInstance(CoreRules.PROJECT_TO_CALC)
val hep = new HepPlanner(builder.build())
hep.setRoot(relNode)
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
index 2634b68..9762075 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -32,22 +32,6 @@ class ScalarFunctionsValidationTest extends
ScalarTypesTestBase {
// Math functions
//
----------------------------------------------------------------------------------------------
- @Test
- def testInvalidLog1(): Unit = {
- testSqlApi(
- "LOG(1, 100)",
- "Infinity"
- )
- }
-
- @Test
- def testInvalidLog2(): Unit ={
- testSqlApi(
- "LOG(-1)",
- "NaN"
- )
- }
-
@Test(expected = classOf[ValidationException])
def testInvalidBin1(): Unit = {
testSqlApi("BIN(f12)", "101010") // float type