This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 053f03c8ea1c7e1e93d41e87606799f4f844c719 Author: wangxlong <[email protected]> AuthorDate: Wed Oct 28 14:03:43 2020 +0800 [FLINK-19587][table-planner-blink] Fix error result when casting binary as varchar This closes #13612 --- .../planner/codegen/calls/ScalarOperatorGens.scala | 2 +- .../planner/expressions/ScalarOperatorsTest.scala | 42 +++++++++++---- .../utils/ScalarOperatorsTestBase.scala | 62 +++++++++++++--------- 3 files changed, 68 insertions(+), 38 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala index e5e229c..b0a7f44 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala @@ -959,7 +959,7 @@ object ScalarOperatorGens { generateCastArrayToString(ctx, operand, operand.resultType.asInstanceOf[ArrayType]) // Byte array -> String UTF-8 - case (VARBINARY, VARCHAR | CHAR) => + case (BINARY | VARBINARY, VARCHAR | CHAR) => val charset = classOf[StandardCharsets].getCanonicalName generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) { terms => s"(new String(${terms.head}, $charset.UTF_8))" diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala index 4f6cbe4..de0899f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala @@ -32,7 +32,7 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { ) testSqlApi( - "CAST(f0 AS DECIMAL) IN (42.0, 2.00, 3.01, 1.000000)", // SQL would downcast otherwise + "CAST (f0 AS DECIMAL) IN (42.0, 2.00, 3.01, 1.000000)", // SQL would downcast otherwise "true" ) @@ -63,6 +63,32 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { } @Test + def testCast(): Unit = { + + // binary -> varchar + testSqlApi( + "CAST (f18 as varchar)", + "hello world") + testSqlApi( + "CAST (CAST (x'68656C6C6F20636F6465' as binary) as varchar)", + "hello code") + + // varbinary -> varchar + testSqlApi( + "CAST (f19 as varchar)", + "hello flink") + testSqlApi( + "CAST (CAST (x'68656C6C6F2063617374' as varbinary) as varchar)", + "hello cast") + + // null case + testSqlApi("CAST (NULL AS INT)", "null") + testSqlApi( + "CAST (NULL AS VARCHAR) = ''", + "null") + } + + @Test def testOtherExpressions(): Unit = { // nested field null type @@ -88,12 +114,6 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { "tRuE", "true") - // null - testSqlApi("CAST(NULL AS INT)", "null") - testSqlApi( - "CAST(NULL AS VARCHAR) = ''", - "null") - // case when testSqlApi("CASE 11 WHEN 1 THEN 'a' ELSE 'b' END", "b") testSqlApi("CASE 2 WHEN 1 THEN 'a' ELSE 'b' END", "b") @@ -116,14 +136,14 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { testSqlApi("CASE WHEN 'a'='a' THEN 1 END", "1") testSqlApi("CASE 2 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "bcd") testSqlApi("CASE 1 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "a") - testSqlApi("CASE 1 WHEN 1 THEN cast('a' as varchar(1)) WHEN 2 THEN " + - "cast('bcd' as varchar(3)) END", "a") + testSqlApi("CASE 1 WHEN 1 THEN CAST ('a' as varchar(1)) WHEN 2 THEN " + + "CAST ('bcd' as varchar(3)) END", "a") testSqlApi("CASE f2 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "11") testSqlApi("CASE f7 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "null") testSqlApi("CASE 42 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "null") testSqlApi("CASE 1 WHEN 1 THEN true WHEN 2 THEN false ELSE NULL END", "true") - testSqlApi("CASE WHEN f2 = 1 THEN CAST('' as INT) ELSE 0 END", "null") - testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null") + testSqlApi("CASE WHEN f2 = 1 THEN CAST ('' as INT) ELSE 0 END", "null") + testSqlApi("IF(true, CAST ('non-numeric' AS BIGINT), 0)", "null") } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala index d24f28c..8f9dc7a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala @@ -18,18 +18,17 @@ package org.apache.flink.table.planner.expressions.utils -import org.apache.flink.api.common.typeinfo.Types -import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.DataTypes import org.apache.flink.table.data.DecimalDataUtils import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.utils.DateTimeTestUtil._ -import org.apache.flink.table.runtime.typeutils.DecimalDataTypeInfo +import org.apache.flink.table.types.AbstractDataType import org.apache.flink.types.Row abstract class ScalarOperatorsTestBase extends ExpressionTestBase { override def testData: Row = { - val testData = new Row(18) + val testData = new Row(20) testData.setField(0, 1: Byte) testData.setField(1, 1: Short) testData.setField(2, 1) @@ -46,34 +45,45 @@ abstract class ScalarOperatorsTestBase extends ExpressionTestBase { testData.setField(13, Row.of("foo", null)) testData.setField(14, null) testData.setField(15, localDate("1996-11-10")) - testData.setField(16, DecimalDataUtils.castFrom("0.00000000", 19, 8)) - testData.setField(17, DecimalDataUtils.castFrom("10.0", 19, 1)) + testData.setField(16, + DecimalDataUtils.castFrom("0.00000000", 19, 8).toBigDecimal) + testData.setField(17, + DecimalDataUtils.castFrom("10.0", 19, 1).toBigDecimal) + testData.setField(18, "hello world".getBytes()) + testData.setField(19, "hello flink".getBytes()) testData } - override def typeInfo: RowTypeInfo = { - new RowTypeInfo( - /* 0 */ Types.BYTE, - /* 1 */ Types.SHORT, - /* 2 */ Types.INT, - /* 3 */ Types.LONG, - /* 4 */ Types.FLOAT, - /* 5 */ Types.DOUBLE, - /* 6 */ Types.BOOLEAN, - /* 7 */ Types.DOUBLE, - /* 8 */ Types.INT, - /* 9 */ Types.INT, - /* 10 */ Types.STRING, - /* 11 */ Types.BOOLEAN, - /* 12 */ Types.BOOLEAN, - /* 13 */ Types.ROW(Types.STRING, Types.STRING), - /* 14 */ Types.STRING, - /* 15 */ Types.LOCAL_DATE, - /* 16 */ DecimalDataTypeInfo.of(19, 8), - /* 17 */ DecimalDataTypeInfo.of(19, 1) + override def testDataType: AbstractDataType[_] = { + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.TINYINT()), + DataTypes.FIELD("f1", DataTypes.SMALLINT()), + DataTypes.FIELD("f2", DataTypes.INT()), + DataTypes.FIELD("f3", DataTypes.BIGINT()), + DataTypes.FIELD("f4", DataTypes.FLOAT()), + DataTypes.FIELD("f5", DataTypes.DOUBLE()), + DataTypes.FIELD("f6", DataTypes.BOOLEAN()), + DataTypes.FIELD("f7", DataTypes.DOUBLE()), + DataTypes.FIELD("f8", DataTypes.INT()), + DataTypes.FIELD("f9", DataTypes.INT()), + DataTypes.FIELD("f10", DataTypes.STRING()), + DataTypes.FIELD("f11", DataTypes.BOOLEAN()), + DataTypes.FIELD("f12", DataTypes.BOOLEAN()), + DataTypes.FIELD("f13", DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.STRING()), + DataTypes.FIELD("f1", DataTypes.STRING())) + ), + DataTypes.FIELD("f14", DataTypes.STRING()), + DataTypes.FIELD("f15", DataTypes.DATE()), + DataTypes.FIELD("f16", DataTypes.DECIMAL(19, 8)), + DataTypes.FIELD("f17", DataTypes.DECIMAL(19, 1)), + DataTypes.FIELD("f18", DataTypes.BINARY(200)), + DataTypes.FIELD("f19", DataTypes.VARBINARY(200)) ) } + override def containsLegacyTypes: Boolean = false + override def functions: Map[String, ScalarFunction] = Map( "shouldNotExecuteFunc" -> ShouldNotExecuteFunc )
