This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 87211c6  [FLINK-22985][table-runtime-blink] Fix NullPointerException 
when comparing temporal type with invalid string literal
87211c6 is described below

commit 87211c63d888a6df029122b45ef4428a1c637d4e
Author: tsreaper <[email protected]>
AuthorDate: Wed Jul 28 19:23:17 2021 +0800

    [FLINK-22985][table-runtime-blink] Fix NullPointerException when comparing 
temporal type with invalid string literal
    
    This closes #16619
---
 .../planner/codegen/calls/ScalarOperatorGens.scala | 54 +++++++++++++---------
 .../planner/expressions/ScalarOperatorsTest.scala  | 24 ++++++++++
 .../utils/ScalarOperatorsTestBase.scala            |  8 +++-
 .../validation/ScalarOperatorsValidationTest.scala | 24 ++++++++++
 4 files changed, 85 insertions(+), 25 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 4f1a39b..7a6af90 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -25,9 +25,9 @@ import org.apache.flink.table.data.writer.{BinaryArrayWriter, 
BinaryRowWriter}
 import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAccess, 
binaryRowSetNull, binaryWriterWriteField, binaryWriterWriteNull, _}
 import org.apache.flink.table.planner.codegen.GenerateUtils._
 import 
org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, 
NEVER_NULL, NO_CODE}
-import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, 
CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
-import org.apache.flink.table.runtime.functions.SqlFunctionUtils
+import org.apache.flink.table.runtime.functions.{SqlDateTimeUtils, 
SqlFunctionUtils}
 import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.{isInteroperable, 
isPrimitive}
@@ -1956,14 +1956,37 @@ object ScalarOperatorGens {
       stringLiteral: GeneratedExpression,
       expectType: LogicalType): GeneratedExpression = {
     checkArgument(stringLiteral.literal)
-    val rightTerm = stringLiteral.resultTerm
+    if (java.lang.Boolean.valueOf(stringLiteral.nullTerm)) {
+      return generateNullLiteral(expectType, nullCheck = true)
+    }
+
+    val stringValue = stringLiteral.literalValue.get.toString
+    val literalCode = expectType.getTypeRoot match {
+      case DATE =>
+        SqlDateTimeUtils.dateStringToUnixDate(stringValue) match {
+          case null => throw new ValidationException(s"String '$stringValue' 
is not a valid date")
+          case v => v
+        }
+      case TIME_WITHOUT_TIME_ZONE =>
+        SqlDateTimeUtils.timeStringToUnixDate(stringValue) match {
+          case null => throw new ValidationException(s"String '$stringValue' 
is not a valid time")
+          case v => v
+        }
+      case TIMESTAMP_WITHOUT_TIME_ZONE =>
+        SqlDateTimeUtils.toTimestampData(stringValue) match {
+          case null =>
+            throw new ValidationException(s"String '$stringValue' is not a 
valid timestamp")
+          case v => s"${CodeGenUtils.TIMESTAMP_DATA}.fromEpochMillis(" +
+            s"${v.getMillisecond}L, ${v.getNanoOfMillisecond})"
+        }
+      case _ => throw new UnsupportedOperationException
+    }
+
     val typeTerm = primitiveTypeTermForType(expectType)
-    val defaultTerm = primitiveDefaultValue(expectType)
-    val term = newName("stringToTime")
-    val code = stringToLocalTimeCode(expectType, rightTerm)
-    val stmt = s"$typeTerm $term = ${stringLiteral.nullTerm} ? $defaultTerm : 
$code;"
+    val resultTerm = newName("stringToTime")
+    val stmt = s"$typeTerm $resultTerm = $literalCode;"
     ctx.addReusableMember(stmt)
-    stringLiteral.copy(resultType = expectType, resultTerm = term)
+    GeneratedExpression(resultTerm, "false", "", expectType)
   }
 
   private def generateCastArrayToString(
@@ -2391,21 +2414,6 @@ object ScalarOperatorGens {
     }
   }
 
-  private def stringToLocalTimeCode(
-      targetType: LogicalType,
-      operandTerm: String): String =
-    targetType.getTypeRoot match {
-      case DATE =>
-        
s"${qualifyMethod(BuiltInMethods.STRING_TO_DATE)}($operandTerm.toString())"
-      case TIME_WITHOUT_TIME_ZONE =>
-        
s"${qualifyMethod(BuiltInMethods.STRING_TO_TIME)}($operandTerm.toString())"
-      case TIMESTAMP_WITHOUT_TIME_ZONE =>
-        s"""
-           
|${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString())
-           |""".stripMargin
-      case _ => throw new UnsupportedOperationException
-    }
-
   private def localTimeToStringCode(
       ctx: CodeGeneratorContext,
       fromType: LogicalType,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
index 9f8968d..7655fd1 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
@@ -198,4 +198,28 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
     testSqlApi("-f17", "-10.0")
     testSqlApi("+f17", "10.0")
   }
+
+  @Test
+  def testTemporalTypeEqualsStringLiteral(): Unit = {
+    testSqlApi("f15 = '1996-11-10'", "true")
+    testSqlApi("f15 = '1996-11-11'", "false")
+    testSqlApi("f15 = cast(null as string)", "null")
+    testSqlApi("'1996-11-10' = f15", "true")
+    testSqlApi("'1996-11-11' = f15", "false")
+    testSqlApi("cast(null as string) = f15", "null")
+
+    testSqlApi("f21 = '12:34:56'", "true")
+    testSqlApi("f21 = '13:34:56'", "false")
+    testSqlApi("f21 = cast(null as string)", "null")
+    testSqlApi("'12:34:56' = f21", "true")
+    testSqlApi("'13:34:56' = f21", "false")
+    testSqlApi("cast(null as string) = f21", "null")
+
+    testSqlApi("f22 = '1996-11-10 12:34:56'", "true")
+    testSqlApi("f22 = '1996-11-10 12:34:57'", "false")
+    testSqlApi("f22 = cast(null as string)", "null")
+    testSqlApi("'1996-11-10 12:34:56' = f22", "true")
+    testSqlApi("'1996-11-10 12:34:57' = f22", "false")
+    testSqlApi("cast(null as string) = f22", "null")
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
index a3968bc..0edae0d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.types.Row
 abstract class ScalarOperatorsTestBase extends ExpressionTestBase {
 
   override def testData: Row = {
-    val testData = new Row(21)
+    val testData = new Row(23)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
     testData.setField(2, 1)
@@ -52,6 +52,8 @@ abstract class ScalarOperatorsTestBase extends 
ExpressionTestBase {
     testData.setField(18, "hello world".getBytes())
     testData.setField(19, "hello flink".getBytes())
     testData.setField(20, "who".getBytes())
+    testData.setField(21, localTime("12:34:56"))
+    testData.setField(22, localDateTime("1996-11-10 12:34:56"))
     testData
   }
 
@@ -80,7 +82,9 @@ abstract class ScalarOperatorsTestBase extends 
ExpressionTestBase {
         DataTypes.FIELD("f17", DataTypes.DECIMAL(19, 1)),
         DataTypes.FIELD("f18", DataTypes.BINARY(200)),
         DataTypes.FIELD("f19", DataTypes.VARBINARY(200)),
-        DataTypes.FIELD("f20", DataTypes.VARBINARY(200))
+        DataTypes.FIELD("f20", DataTypes.VARBINARY(200)),
+        DataTypes.FIELD("f21", DataTypes.TIME()),
+        DataTypes.FIELD("f22", DataTypes.TIMESTAMP())
     )
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
index f3b5fff..4b27008 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
@@ -84,4 +84,28 @@ class ScalarOperatorsValidationTest extends 
ScalarOperatorsTestBase {
       "FAIL"
     )
   }
+
+  @Test
+  def testTemporalTypeEqualsInvalidStringLiteral(): Unit = {
+    testExpectedSqlException(
+      "f15 = 'invalid'", "is not a valid date",
+      classOf[ValidationException])
+    testExpectedSqlException(
+      "'invalid' = f15", "is not a valid date",
+      classOf[ValidationException])
+
+    testExpectedSqlException(
+      "f21 = 'invalid'", "is not a valid time",
+      classOf[ValidationException])
+    testExpectedSqlException(
+      "'invalid' = f21", "is not a valid time",
+      classOf[ValidationException])
+
+    testExpectedSqlException(
+      "f22 = 'invalid'", "is not a valid timestamp",
+      classOf[ValidationException])
+    testExpectedSqlException(
+      "'invalid' = f22", "is not a valid timestamp",
+      classOf[ValidationException])
+  }
 }

Reply via email to