This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new c72db29  [FLINK-15987][table-planner-blink] SELECT 1.0e0 / 0.0e0 
throws NumberFormatException
c72db29 is described below

commit c72db290b7c8e0266f5df43998a39fb4fe1ac62a
Author: xuyang <[email protected]>
AuthorDate: Fri Oct 22 16:13:06 2021 +0800

    [FLINK-15987][table-planner-blink] SELECT 1.0e0 / 0.0e0 throws 
NumberFormatException
    
    This closes #17546
---
 .../table/planner/codegen/ExpressionReducer.scala  | 73 ++++++++++++++++------
 .../planner/expressions/ScalarFunctionsTest.scala  |  8 +++
 .../planner/expressions/SqlExpressionTest.scala    | 23 ++++++-
 .../planner/expressions/TemporalTypesTest.scala    | 54 +++++++++-------
 .../expressions/utils/ExpressionTestBase.scala     | 18 +++---
 .../validation/ScalarFunctionsValidationTest.scala | 16 -----
 6 files changed, 123 insertions(+), 69 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 166011f..02239c5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -32,7 +32,8 @@ import org.apache.flink.table.types.logical.RowType
 import org.apache.flink.table.util.TimestampStringUtils.fromLocalDateTime
 
 import org.apache.calcite.avatica.util.ByteString
-import org.apache.calcite.rex.{RexBuilder, RexExecutor, RexNode}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexExecutor, RexLiteral, 
RexNode, RexUtil}
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.`type`.SqlTypeName
 
 import scala.collection.JavaConverters._
@@ -61,25 +62,7 @@ class ExpressionReducer(
 
     val pythonUDFExprs = new ListBuffer[RexNode]()
 
-    val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, 
e)).flatMap {
-
-      // Skip expressions that contain python functions because it's quite 
expensive to
-      // call Python UDFs during optimization phase. They will be optimized 
during the runtime.
-      case (_, e) if containsPythonCall(e) =>
-        pythonUDFExprs += e
-        None
-
-      // we don't support object literals yet, we skip those constant 
expressions
-      case (SqlTypeName.ANY, _) |
-           (SqlTypeName.OTHER, _) |
-           (SqlTypeName.ROW, _) |
-           (SqlTypeName.STRUCTURED, _) |
-           (SqlTypeName.ARRAY, _) |
-           (SqlTypeName.MAP, _) |
-           (SqlTypeName.MULTISET, _) => None
-
-      case (_, e) => Some(e)
-    }
+    val literals = skipAndValidateExprs(rexBuilder, constExprs, pythonUDFExprs)
 
     val literalTypes = literals.map(e => 
FlinkTypeFactory.toLogicalType(e.getType))
     val resultType = RowType.of(literalTypes: _*)
@@ -212,6 +195,56 @@ class ExpressionReducer(
     }
   }
 
+  /**
+   * skip the expressions that can't be reduced now
+   * and validate the expressions
+   */
+  private def skipAndValidateExprs(
+      rexBuilder: RexBuilder,
+      constExprs: java.util.List[RexNode],
+      pythonUDFExprs: ListBuffer[RexNode]): List[RexNode] ={
+
+    constExprs.asScala.map(e => (e.getType.getSqlTypeName, e)).flatMap {
+
+      // Skip expressions that contain python functions because it's quite 
expensive to
+      // call Python UDFs during optimization phase. They will be optimized 
during the runtime.
+      case (_, e) if containsPythonCall(e) =>
+        pythonUDFExprs += e
+        None
+
+      // we don't support object literals yet, we skip those constant 
expressions
+      case (SqlTypeName.ANY, _) |
+           (SqlTypeName.OTHER, _) |
+           (SqlTypeName.ROW, _) |
+           (SqlTypeName.STRUCTURED, _) |
+           (SqlTypeName.ARRAY, _) |
+           (SqlTypeName.MAP, _) |
+           (SqlTypeName.MULTISET, _) => None
+
+      case (_, call: RexCall) => {
+        // to ensure the division is non-zero when the operator is DIVIDE
+        if (call.getOperator.getKind.equals(SqlKind.DIVIDE)) {
+          val ops = call.getOperands
+          val divisionLiteral = ops.get(ops.size() - 1)
+
+          // according to BuiltInFunctionDefinitions, the DEVIDE's second op 
must be numeric
+          assert(RexUtil.isDeterministic(divisionLiteral))
+          val divisionComparable =
+            
divisionLiteral.asInstanceOf[RexLiteral].getValue.asInstanceOf[Comparable[Any]]
+          val zeroComparable = rexBuilder.makeExactLiteral(
+            new java.math.BigDecimal(0))
+            .getValue.asInstanceOf[Comparable[Any]]
+          if (divisionComparable.compareTo(zeroComparable) == 0) {
+            throw new ArithmeticException("Division by zero")
+          }
+        }
+        Some(call)
+      }
+
+      case (_, e) => Some(e)
+    }.toList
+  }
+
   // We may skip the reduce if the original constant is invalid and casted as 
a null literal,
   // cause now this may change the RexNode's and it's parent node's 
nullability.
   def maySkipNullLiteralReduce(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 41514e5..f036cfe 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -2506,6 +2506,14 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "LOG(cast (null AS DOUBLE), cast (null AS DOUBLE))",
       "null"
     )
+
+    // invalid log
+    val infiniteOrNaNException = "Infinite or NaN"
+    // Infinity
+    testExpectedSqlException("LOG(1, 100)", infiniteOrNaNException, 
classOf[NumberFormatException])
+    // NaN
+    testExpectedSqlException("LOG(-1)", infiniteOrNaNException, 
classOf[NumberFormatException])
+
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 721ad72..259cfcd 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -124,8 +124,8 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("ROUND(-12.345, 2)", "-12.35")
     testSqlApi("PI()", "3.141592653589793")
     testSqlApi("E()", "2.718281828459045")
-    testSqlApi("truncate(42.345)", "42")
-    testSqlApi("truncate(cast(42.345 as decimal(5, 3)), 2)", "42.34")
+    testSqlApi("truncate(42.345)", "42.000")
+    testSqlApi("truncate(cast(42.345 as decimal(5, 3)), 2)", "42.340")
   }
 
   @Test
@@ -144,6 +144,25 @@ class SqlExpressionTest extends ExpressionTestBase {
     // Decimal(2,1) / Decimal(10,0) => Decimal(23,12)
     testSqlApi("2.0/(-3)", "-0.666666666667")
     testSqlApi("-7.9/2", "-3.950000000000")
+
+
+    // invalid division
+    val divisorZeroException = "Division by zero"
+    testExpectedSqlException(
+      "1/cast(0.00 as decimal)", divisorZeroException, 
classOf[ArithmeticException])
+    testExpectedSqlException(
+      "1/cast(0.00 as double)", divisorZeroException, 
classOf[ArithmeticException])
+    testExpectedSqlException(
+      "1/cast(0.00 as float)", divisorZeroException, 
classOf[ArithmeticException])
+    testExpectedSqlException(
+      "1/cast(0 as tinyint)", divisorZeroException, 
classOf[ArithmeticException])
+    testExpectedSqlException(
+      "1/cast(0 as smallint)", divisorZeroException, 
classOf[ArithmeticException])
+    testExpectedSqlException(
+      "1/0", divisorZeroException, classOf[ArithmeticException])
+    testExpectedSqlException(
+      "1/cast(0 as bigint)", divisorZeroException, 
classOf[ArithmeticException])
+
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index f854bfe..3eb66cf 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -18,12 +18,6 @@
 
 package org.apache.flink.table.planner.expressions
 
-import java.sql.Timestamp
-import java.text.SimpleDateFormat
-import java.time.{Instant, ZoneId, ZoneOffset}
-import java.util.{Locale, TimeZone}
-import java.lang.{Double => JDouble, Float => JFloat, Integer => JInt, Long => 
JLong}
-
 import org.apache.flink.table.api._
 import org.apache.flink.table.expressions.TimeIntervalUnit
 import org.apache.flink.table.planner.codegen.CodeGenException
@@ -32,8 +26,15 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
+
 import org.junit.Test
 
+import java.lang.{Double => JDouble, Float => JFloat, Integer => JInt, Long => 
JLong}
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.time.{Instant, ZoneId, ZoneOffset}
+import java.util.Locale
+
 class TemporalTypesTest extends ExpressionTestBase {
 
   @Test
@@ -1116,6 +1117,30 @@ class TemporalTypesTest extends ExpressionTestBase {
       "1437699600")
   }
 
+  /**
+   * now Flink only support TIMESTAMP(3) as the return type in TO_TIMESTAMP
+   * See: https://issues.apache.org/jira/browse/FLINK-14925
+   */
+  @Test
+  def testToTimeStampFunctionWithHighPrecision(): Unit = {
+    testSqlApi(
+      "TO_TIMESTAMP('1970-01-01 00:00:00.123456789')",
+      "1970-01-01 00:00:00.123")
+
+    testSqlApi(
+      "TO_TIMESTAMP('1970-01-01 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS')",
+      "1970-01-01 00:00:00.123")
+
+    testSqlApi(
+      "TO_TIMESTAMP('20000202 59:59.1234567', 'yyyyMMdd mm:ss.SSSSSSS')",
+      "2000-02-02 00:59:59.123")
+
+    testSqlApi(
+      "TO_TIMESTAMP('1234567', 'SSSSSSS')",
+      "1970-01-01 00:00:00.123")
+  }
+
+
   @Test
   def testHighPrecisionTimestamp(): Unit = {
     // EXTRACT should support millisecond/microsecond/nanosecond
@@ -1167,15 +1192,6 @@ class TemporalTypesTest extends ExpressionTestBase {
     //    "TIMESTAMP '1970-01-01 00:00:00.123455789')",
     //  "1")
 
-    // TO_TIMESTAMP should support up to nanosecond
-    testSqlApi(
-      "TO_TIMESTAMP('1970-01-01 00:00:00.123456789')",
-      "1970-01-01 00:00:00.123456789")
-
-    testSqlApi(
-      "TO_TIMESTAMP('1970-01-01 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS')",
-      "1970-01-01 00:00:00.12345")
-
     testSqlApi("TO_TIMESTAMP('abc')", "null")
 
     // TO_TIMESTAMP should complement 
YEAR/MONTH/DAY/HOUR/MINUTE/SECOND/NANO_OF_SECOND
@@ -1183,14 +1199,6 @@ class TemporalTypesTest extends ExpressionTestBase {
       "TO_TIMESTAMP('2000020210', 'yyyyMMddHH')",
       "2000-02-02 10:00:00.000")
 
-    testSqlApi(
-      "TO_TIMESTAMP('20000202 59:59.1234567', 'yyyyMMdd mm:ss.SSSSSSS')",
-      "2000-02-02 00:59:59.1234567")
-
-    testSqlApi(
-      "TO_TIMESTAMP('1234567', 'SSSSSSS')",
-      "1970-01-01 00:00:00.1234567")
-
     // CAST between two TIMESTAMPs
     testSqlApi(
       "CAST(TIMESTAMP '1970-01-01 00:00:00.123456789' AS TIMESTAMP(6))",
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 567e963..fb25410 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -18,14 +18,7 @@
 
 package org.apache.flink.table.planner.expressions.utils
 
-import java.util.Collections
-import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.logical.LogicalCalc
-import org.apache.calcite.rel.rules._
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-import org.apache.flink.api.common.{JobID, TaskInfo}
+import org.apache.flink.api.common.TaskInfo
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext
 import org.apache.flink.api.common.functions.{MapFunction, RichFunction, 
RichMapFunction}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -49,10 +42,18 @@ import org.apache.flink.table.types.logical.{RowType, 
VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
 
+import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
 import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{After, Before, Rule}
 
+import java.util.Collections
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
@@ -303,6 +304,7 @@ abstract class ExpressionTestBase {
       exceptionClass: Class[_ <: Throwable],
       exprs: mutable.ArrayBuffer[_]): Unit = {
     val builder = new HepProgramBuilder()
+    builder.addRuleInstance(CoreRules.PROJECT_REDUCE_EXPRESSIONS)
     builder.addRuleInstance(CoreRules.PROJECT_TO_CALC)
     val hep = new HepPlanner(builder.build())
     hep.setRoot(relNode)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
index 2634b68..9762075 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -32,22 +32,6 @@ class ScalarFunctionsValidationTest extends 
ScalarTypesTestBase {
   // Math functions
   // 
----------------------------------------------------------------------------------------------
 
-  @Test
-  def testInvalidLog1(): Unit = {
-    testSqlApi(
-      "LOG(1, 100)",
-      "Infinity"
-    )
-  }
-
-  @Test
-  def testInvalidLog2(): Unit ={
-    testSqlApi(
-      "LOG(-1)",
-      "NaN"
-    )
-  }
-
   @Test(expected = classOf[ValidationException])
   def testInvalidBin1(): Unit = {
     testSqlApi("BIN(f12)", "101010") // float type

Reply via email to