This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 87211c6 [FLINK-22985][table-runtime-blink] Fix NullPointerException
when comparing temporal type with invalid string literal
87211c6 is described below
commit 87211c63d888a6df029122b45ef4428a1c637d4e
Author: tsreaper <[email protected]>
AuthorDate: Wed Jul 28 19:23:17 2021 +0800
[FLINK-22985][table-runtime-blink] Fix NullPointerException when comparing
temporal type with invalid string literal
This closes #16619
---
.../planner/codegen/calls/ScalarOperatorGens.scala | 54 +++++++++++++---------
.../planner/expressions/ScalarOperatorsTest.scala | 24 ++++++++++
.../utils/ScalarOperatorsTestBase.scala | 8 +++-
.../validation/ScalarOperatorsValidationTest.scala | 24 ++++++++++
4 files changed, 85 insertions(+), 25 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 4f1a39b..7a6af90 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -25,9 +25,9 @@ import org.apache.flink.table.data.writer.{BinaryArrayWriter,
BinaryRowWriter}
import
org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAccess,
binaryRowSetNull, binaryWriterWriteField, binaryWriterWriteNull, _}
import org.apache.flink.table.planner.codegen.GenerateUtils._
import
org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL,
NEVER_NULL, NO_CODE}
-import org.apache.flink.table.planner.codegen.{CodeGenException,
CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils,
CodeGeneratorContext, GeneratedExpression}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
-import org.apache.flink.table.runtime.functions.SqlFunctionUtils
+import org.apache.flink.table.runtime.functions.{SqlDateTimeUtils,
SqlFunctionUtils}
import
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
import org.apache.flink.table.runtime.types.PlannerTypeUtils
import org.apache.flink.table.runtime.types.PlannerTypeUtils.{isInteroperable,
isPrimitive}
@@ -1956,14 +1956,37 @@ object ScalarOperatorGens {
stringLiteral: GeneratedExpression,
expectType: LogicalType): GeneratedExpression = {
checkArgument(stringLiteral.literal)
- val rightTerm = stringLiteral.resultTerm
+ if (java.lang.Boolean.valueOf(stringLiteral.nullTerm)) {
+ return generateNullLiteral(expectType, nullCheck = true)
+ }
+
+ val stringValue = stringLiteral.literalValue.get.toString
+ val literalCode = expectType.getTypeRoot match {
+ case DATE =>
+ SqlDateTimeUtils.dateStringToUnixDate(stringValue) match {
+ case null => throw new ValidationException(s"String '$stringValue'
is not a valid date")
+ case v => v
+ }
+ case TIME_WITHOUT_TIME_ZONE =>
+ SqlDateTimeUtils.timeStringToUnixDate(stringValue) match {
+ case null => throw new ValidationException(s"String '$stringValue'
is not a valid time")
+ case v => v
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE =>
+ SqlDateTimeUtils.toTimestampData(stringValue) match {
+ case null =>
+ throw new ValidationException(s"String '$stringValue' is not a
valid timestamp")
+ case v => s"${CodeGenUtils.TIMESTAMP_DATA}.fromEpochMillis(" +
+ s"${v.getMillisecond}L, ${v.getNanoOfMillisecond})"
+ }
+ case _ => throw new UnsupportedOperationException
+ }
+
val typeTerm = primitiveTypeTermForType(expectType)
- val defaultTerm = primitiveDefaultValue(expectType)
- val term = newName("stringToTime")
- val code = stringToLocalTimeCode(expectType, rightTerm)
- val stmt = s"$typeTerm $term = ${stringLiteral.nullTerm} ? $defaultTerm :
$code;"
+ val resultTerm = newName("stringToTime")
+ val stmt = s"$typeTerm $resultTerm = $literalCode;"
ctx.addReusableMember(stmt)
- stringLiteral.copy(resultType = expectType, resultTerm = term)
+ GeneratedExpression(resultTerm, "false", "", expectType)
}
private def generateCastArrayToString(
@@ -2391,21 +2414,6 @@ object ScalarOperatorGens {
}
}
- private def stringToLocalTimeCode(
- targetType: LogicalType,
- operandTerm: String): String =
- targetType.getTypeRoot match {
- case DATE =>
-
s"${qualifyMethod(BuiltInMethods.STRING_TO_DATE)}($operandTerm.toString())"
- case TIME_WITHOUT_TIME_ZONE =>
-
s"${qualifyMethod(BuiltInMethods.STRING_TO_TIME)}($operandTerm.toString())"
- case TIMESTAMP_WITHOUT_TIME_ZONE =>
- s"""
-
|${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString())
- |""".stripMargin
- case _ => throw new UnsupportedOperationException
- }
-
private def localTimeToStringCode(
ctx: CodeGeneratorContext,
fromType: LogicalType,
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
index 9f8968d..7655fd1 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
@@ -198,4 +198,28 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
testSqlApi("-f17", "-10.0")
testSqlApi("+f17", "10.0")
}
+
+ @Test
+ def testTemporalTypeEqualsStringLiteral(): Unit = {
+ testSqlApi("f15 = '1996-11-10'", "true")
+ testSqlApi("f15 = '1996-11-11'", "false")
+ testSqlApi("f15 = cast(null as string)", "null")
+ testSqlApi("'1996-11-10' = f15", "true")
+ testSqlApi("'1996-11-11' = f15", "false")
+ testSqlApi("cast(null as string) = f15", "null")
+
+ testSqlApi("f21 = '12:34:56'", "true")
+ testSqlApi("f21 = '13:34:56'", "false")
+ testSqlApi("f21 = cast(null as string)", "null")
+ testSqlApi("'12:34:56' = f21", "true")
+ testSqlApi("'13:34:56' = f21", "false")
+ testSqlApi("cast(null as string) = f21", "null")
+
+ testSqlApi("f22 = '1996-11-10 12:34:56'", "true")
+ testSqlApi("f22 = '1996-11-10 12:34:57'", "false")
+ testSqlApi("f22 = cast(null as string)", "null")
+ testSqlApi("'1996-11-10 12:34:56' = f22", "true")
+ testSqlApi("'1996-11-10 12:34:57' = f22", "false")
+ testSqlApi("cast(null as string) = f22", "null")
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
index a3968bc..0edae0d 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.types.Row
abstract class ScalarOperatorsTestBase extends ExpressionTestBase {
override def testData: Row = {
- val testData = new Row(21)
+ val testData = new Row(23)
testData.setField(0, 1: Byte)
testData.setField(1, 1: Short)
testData.setField(2, 1)
@@ -52,6 +52,8 @@ abstract class ScalarOperatorsTestBase extends
ExpressionTestBase {
testData.setField(18, "hello world".getBytes())
testData.setField(19, "hello flink".getBytes())
testData.setField(20, "who".getBytes())
+ testData.setField(21, localTime("12:34:56"))
+ testData.setField(22, localDateTime("1996-11-10 12:34:56"))
testData
}
@@ -80,7 +82,9 @@ abstract class ScalarOperatorsTestBase extends
ExpressionTestBase {
DataTypes.FIELD("f17", DataTypes.DECIMAL(19, 1)),
DataTypes.FIELD("f18", DataTypes.BINARY(200)),
DataTypes.FIELD("f19", DataTypes.VARBINARY(200)),
- DataTypes.FIELD("f20", DataTypes.VARBINARY(200))
+ DataTypes.FIELD("f20", DataTypes.VARBINARY(200)),
+ DataTypes.FIELD("f21", DataTypes.TIME()),
+ DataTypes.FIELD("f22", DataTypes.TIMESTAMP())
)
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
index f3b5fff..4b27008 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
@@ -84,4 +84,28 @@ class ScalarOperatorsValidationTest extends
ScalarOperatorsTestBase {
"FAIL"
)
}
+
+ @Test
+ def testTemporalTypeEqualsInvalidStringLiteral(): Unit = {
+ testExpectedSqlException(
+ "f15 = 'invalid'", "is not a valid date",
+ classOf[ValidationException])
+ testExpectedSqlException(
+ "'invalid' = f15", "is not a valid date",
+ classOf[ValidationException])
+
+ testExpectedSqlException(
+ "f21 = 'invalid'", "is not a valid time",
+ classOf[ValidationException])
+ testExpectedSqlException(
+ "'invalid' = f21", "is not a valid time",
+ classOf[ValidationException])
+
+ testExpectedSqlException(
+ "f22 = 'invalid'", "is not a valid timestamp",
+ classOf[ValidationException])
+ testExpectedSqlException(
+ "'invalid' = f22", "is not a valid timestamp",
+ classOf[ValidationException])
+ }
}