This is an automated email from the ASF dual-hosted git repository.
ron pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new eaefacef0d2 [FLINK-35832][Table SQL / Planner] Fix IFNULL function
returns incorrect result
eaefacef0d2 is described below
commit eaefacef0d25d570b8ce8bcbbbc0a425196f6f79
Author: dylanhz <[email protected]>
AuthorDate: Thu Jul 18 19:35:42 2024 +0800
[FLINK-35832][Table SQL / Planner] Fix IFNULL function returns incorrect
result
This closes #25099
---
.../types/inference/strategies/IfNullTypeStrategy.java | 18 ++++++++++++++++--
.../table/planner/functions/MiscFunctionsITCase.java | 12 ++++++++++--
.../planner/expressions/ScalarFunctionsTest.scala | 16 ++++++++--------
.../table/planner/runtime/batch/sql/CalcITCase.scala | 6 ++++++
.../table/planner/runtime/stream/sql/CalcITCase.scala | 14 +++++++++++++-
5 files changed, 53 insertions(+), 13 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/IfNullTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/IfNullTypeStrategy.java
index f3f09564439..f405cdc4002 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/IfNullTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/IfNullTypeStrategy.java
@@ -22,11 +22,19 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
+import org.apache.flink.table.types.utils.TypeConversions;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
-/** Type strategy specific for avoiding nulls. */
+/**
+ * Type strategy specific for avoiding nulls. <br>
+ * If arg0 is non-nullable, output datatype is exactly the datatype of arg0.
Otherwise, output
+ * datatype is the common type of arg0 and arg1. In the second case, output
type is nullable only if
+ * both args are nullable.
+ */
@Internal
class IfNullTypeStrategy implements TypeStrategy {
@@ -35,10 +43,16 @@ class IfNullTypeStrategy implements TypeStrategy {
final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
final DataType inputDataType = argumentDataTypes.get(0);
final DataType nullReplacementDataType = argumentDataTypes.get(1);
+
if (!inputDataType.getLogicalType().isNullable()) {
return Optional.of(inputDataType);
}
- return Optional.of(nullReplacementDataType);
+ return LogicalTypeMerging.findCommonType(
+ Arrays.asList(
+ inputDataType.getLogicalType(),
+ nullReplacementDataType.getLogicalType()))
+ .map(t ->
t.copy(nullReplacementDataType.getLogicalType().isNullable()))
+ .map(TypeConversions::fromLogicalToDataType);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
index 79020e42539..a9d243656b5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
@@ -55,8 +55,11 @@ class MiscFunctionsITCase extends BuiltInFunctionTestBase {
DataTypes.STRING())
.testSqlResult("TYPEOF(NULL)", "NULL",
DataTypes.STRING()),
TestSetSpec.forFunction(BuiltInFunctionDefinitions.IF_NULL)
- .onFieldsWithData(null, new BigDecimal("123.45"))
- .andDataTypes(DataTypes.INT().nullable(),
DataTypes.DECIMAL(5, 2).notNull())
+ .onFieldsWithData(null, new BigDecimal("123.45"),
"Hello world")
+ .andDataTypes(
+ DataTypes.INT().nullable(),
+ DataTypes.DECIMAL(5, 2).notNull(),
+ DataTypes.STRING())
.withFunction(TakesNotNull.class)
.testResult(
$("f0").ifNull($("f0")),
@@ -81,6 +84,11 @@ class MiscFunctionsITCase extends BuiltInFunctionTestBase {
"IFNULL(f1, f0)",
new BigDecimal("123.45"),
DataTypes.DECIMAL(12, 2).notNull())
+ .testResult(
+ $("f2").ifNull("0"),
+ "IFNULL(f2, '0')",
+ "Hello world",
+ DataTypes.STRING().notNull())
.testResult(
call("TakesNotNull", $("f0").ifNull(12)),
"TakesNotNull(IFNULL(f0, 12))",
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index d226ed93be9..700f0138b97 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -2786,10 +2786,10 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
val url = "CAST('http://user:pass@host' AS VARCHAR(50))"
val base64 = "CAST('aGVsbG8gd29ybGQ=' AS VARCHAR(20))"
- testSqlApi(s"IFNULL(SUBSTR($str1, 2, 3), $str2)", "el")
- testSqlApi(s"IFNULL(SUBSTRING($str1, 2, 3), $str2)", "el")
- testSqlApi(s"IFNULL(LEFT($str1, 3), $str2)", "He")
- testSqlApi(s"IFNULL(RIGHT($str1, 3), $str2)", "ll")
+ testSqlApi(s"IFNULL(SUBSTR($str1, 2, 3), $str2)", "ell")
+ testSqlApi(s"IFNULL(SUBSTRING($str1, 2, 3), $str2)", "ell")
+ testSqlApi(s"IFNULL(LEFT($str1, 3), $str2)", "Hel")
+ testSqlApi(s"IFNULL(RIGHT($str1, 3), $str2)", "llo")
testSqlApi(s"IFNULL(REGEXP_EXTRACT($str1, 'H(.+?)l(.+?)$$', 2), $str2)",
"lo")
testSqlApi(s"IFNULL(REGEXP_REPLACE($str1, 'e.l', 'EXL'), $str2)", "HEXLo")
testSqlApi(s"IFNULL(UPPER($str1), $str2)", "HELLO")
@@ -2799,9 +2799,9 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
testSqlApi(s"IFNULL(LPAD($str1, 7, $str3), $str2)", "heHello")
testSqlApi(s"IFNULL(RPAD($str1, 7, $str3), $str2)", "Hellohe")
testSqlApi(s"IFNULL(REPEAT($str1, 2), $str2)", "HelloHello")
- testSqlApi(s"IFNULL(REVERSE($str1), $str2)", "ol")
+ testSqlApi(s"IFNULL(REVERSE($str1), $str2)", "olleH")
testSqlApi(s"IFNULL(REPLACE($str3, ' ', '_'), $str2)", "hello_world")
- testSqlApi(s"IFNULL(SPLIT_INDEX($str3, ' ', 1), $str2)", "wo")
+ testSqlApi(s"IFNULL(SPLIT_INDEX($str3, ' ', 1), $str2)", "world")
testSqlApi(s"IFNULL(MD5($str1), $str2)",
"8b1a9953c4611296a827abf8c47804d7")
testSqlApi(s"IFNULL(SHA1($str1), $str2)",
"f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0")
testSqlApi(
@@ -2822,7 +2822,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
testSqlApi(
s"IFNULL(SHA2($str1, 256), $str2)",
"185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969")
- testSqlApi(s"IFNULL(PARSE_URL($url, 'HOST'), $str2)", "ho")
+ testSqlApi(s"IFNULL(PARSE_URL($url, 'HOST'), $str2)", "host")
testSqlApi(s"IFNULL(FROM_BASE64($base64), $str2)", "hello world")
testSqlApi(s"IFNULL(TO_BASE64($str3), $str2)", "aGVsbG8gd29ybGQ=")
testSqlApi(s"IFNULL(CHR(65), $str2)", "A")
@@ -2834,7 +2834,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
testSqlApi(s"IFNULL(RTRIM($str4), $str2)", " hello")
testSqlApi(s"IFNULL($str1 || $str2, $str2)", "HelloHi")
testSqlApi(s"IFNULL(SUBSTRING(UUID(), 9, 1), $str2)", "-")
- testSqlApi(s"IFNULL(DECODE(ENCODE($str1, 'utf-8'), 'utf-8'), $str2)", "He")
+ testSqlApi(s"IFNULL(DECODE(ENCODE($str1, 'utf-8'), 'utf-8'), $str2)",
"Hello")
testSqlApi(s"IFNULL(CAST(DATE '2021-04-06' AS VARCHAR(10)), $str2)",
"2021-04-06")
testSqlApi(s"IFNULL(CAST(TIME '11:05:30' AS VARCHAR(8)), $str2)",
"11:05:30")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 39062dd3481..011475eb268 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -2333,4 +2333,10 @@ class CalcITCase extends BatchTestBase {
Seq(row(2.0), row(2.0), row(2.0))
)
}
+
+ @Test
+ def testIfNull(): Unit = {
+ // reported in FLINK-35832
+ checkResult("SELECT IFNULL(JSON_VALUE('{\"a\":16}','$.a'),'0')",
Seq(row("16")));
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 62e2406a371..076b00411cb 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -28,7 +28,7 @@ import
org.apache.flink.table.api.config.ExecutionConfigOptions
import
org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour
import org.apache.flink.table.api.internal.TableEnvironmentInternal
import org.apache.flink.table.catalog.CatalogDatabaseImpl
-import org.apache.flink.table.data.{GenericRowData, MapData, RowData}
+import org.apache.flink.table.data.{GenericRowData, MapData}
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -810,4 +810,16 @@ class CalcITCase extends StreamingTestBase {
val expected = List("2.0", "2.0", "2.0")
assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
}
+
+ @Test
+ def testIfNull(): Unit = {
+ // reported in FLINK-35832
+ val result = tEnv.sqlQuery("SELECT
IFNULL(JSON_VALUE('{\"a\":16}','$.a'),'0')")
+ var sink = new TestingAppendSink
+ tEnv.toDataStream(result, DataTypes.ROW(DataTypes.STRING())).addSink(sink)
+ env.execute()
+
+ val expected = List("16")
+ assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+ }
}