This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 5fcc4a1 [FLINK-24492][table-planner] Fix incorrect implicit type
conversion between numeric and (var)char
5fcc4a1 is described below
commit 5fcc4a10eaabceeb131ddeb29379bd1c73b1303a
Author: xuyang <[email protected]>
AuthorDate: Sat Oct 9 18:08:47 2021 +0800
[FLINK-24492][table-planner] Fix incorrect implicit type conversion between
numeric and (var)char
This closes #17444
(cherry picked from commit 25dc6eacd38bd7a70883fc8e54f0bec8dc5861c0)
---
.../planner/codegen/calls/ScalarOperatorGens.scala | 22 +
.../logical/JoinConditionTypeCoerceRule.scala | 4 +-
.../stream/sql/ImplicitTypeConversionITCase.java | 552 +++++++++++++++++++++
.../planner/expressions/SqlExpressionTest.scala | 158 ++++++
.../plan/batch/sql/join/LookupJoinTest.scala | 4 +-
.../plan/stream/sql/join/LookupJoinTest.scala | 15 +-
6 files changed, 747 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 083a8bd..668b130 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -415,11 +415,32 @@ object ScalarOperatorGens {
}
}
+ /**
+ * check the validity of implicit type conversion
+ * See:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-154%3A+SQL+Implicit+Type+Coercion
+ */
+ private def checkImplicitConversionValidity(
+ left: GeneratedExpression,
+ right: GeneratedExpression): Unit = {
+ // TODO: in flip-154, we should support implicit type conversion between
(var)char and numeric,
+ // but flink has not yet supported now
+ if ((isNumeric(left.resultType) && isCharacterString(right.resultType))
+ || (isNumeric(right.resultType) && isCharacterString(left.resultType))) {
+ throw new CodeGenException(
+ "implicit type conversion between " +
+ s"${left.resultType.getTypeRoot}" +
+ s" and " +
+ s"${right.resultType.getTypeRoot}" +
+ s" is not supported now")
+ }
+ }
+
def generateEquals(
ctx: CodeGeneratorContext,
left: GeneratedExpression,
right: GeneratedExpression)
: GeneratedExpression = {
+ checkImplicitConversionValidity(left,right)
val canEqual = isInteroperable(left.resultType, right.resultType)
if (isCharacterString(left.resultType) &&
isCharacterString(right.resultType)) {
generateOperatorIfNotNull(ctx, new BooleanType(), left, right) {
@@ -519,6 +540,7 @@ object ScalarOperatorGens {
left: GeneratedExpression,
right: GeneratedExpression)
: GeneratedExpression = {
+ checkImplicitConversionValidity(left,right)
if (isCharacterString(left.resultType) &&
isCharacterString(right.resultType)) {
generateOperatorIfNotNull(ctx, new BooleanType(), left, right) {
(leftTerm, rightTerm) => s"!$leftTerm.equals($rightTerm)"
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
index ff00dd2..4c888f8 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
@@ -74,7 +74,9 @@ class JoinConditionTypeCoerceRule extends RelOptRule(
val targetType = typeFactory.leastRestrictive(refList.map(ref =>
ref.getType))
if (targetType == null) {
throw new TableException(
- s"${ref1.getType} and ${ref2.getType} does not have common
type now")
+ s"implicit type conversion between" +
+ s" ${ref1.getType} and ${ref2.getType} " +
+ s"is not supported on join's condition now")
}
newJoinFilters += builder.equals(
rexBuilder.ensureType(targetType, ref1, true),
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
new file mode 100644
index 0000000..1739a7b
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.scala.DataStreamConversions;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.planner.codegen.CodeGenException;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestingAppendRowDataSink;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.LegacyRowResource;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.junit.Assert.assertEquals;
+
+/** test implicit type conversion between different types. */
+public class ImplicitTypeConversionITCase extends StreamingTestBase {
+
+ @Rule public LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
+
+ private List<String> testSingleTableSqlQueryWithOutputType(
+ String sqlQuery, InternalTypeInfo<RowData> outputType) {
+ GenericRowData rowData = new GenericRowData(14);
+ rowData.setField(0, (byte) 1);
+ rowData.setField(1, (short) 1);
+ rowData.setField(2, 1);
+ rowData.setField(3, (long) 1);
+ rowData.setField(4, DecimalDataUtils.castFrom(1, 1, 0));
+ rowData.setField(5, (float) 1);
+ rowData.setField(6, (double) 1);
+ int date = (int) LocalDate.parse("2001-01-01").toEpochDay();
+ rowData.setField(7, date);
+ int time = (int) (LocalTime.parse("00:00:00").toNanoOfDay() /
1000000L);
+ rowData.setField(8, time);
+ TimestampData timestamp =
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2001-01-01T00:00:00"));
+ rowData.setField(9, timestamp);
+ rowData.setField(10, StringData.fromString("1"));
+ rowData.setField(11, StringData.fromString("2001-01-01"));
+ rowData.setField(12, StringData.fromString("00:00:00"));
+ rowData.setField(13, StringData.fromString("2001-01-01 00:00:00"));
+ List data = Arrays.asList(rowData);
+
+ TypeInformation<RowData> tpe =
+ InternalTypeInfo.ofFields(
+ new TinyIntType(),
+ new SmallIntType(),
+ new IntType(),
+ new BigIntType(),
+ new DecimalType(1, 0),
+ new FloatType(),
+ new DoubleType(),
+ new DateType(),
+ new TimeType(),
+ new TimestampType(),
+ new VarCharType(),
+ new VarCharType(),
+ new VarCharType(),
+ new VarCharType());
+
+ DataStream ds =
env().fromCollection(JavaScalaConversionUtil.toScala(data), tpe);
+ DataStreamConversions conversions = new DataStreamConversions(ds);
+
+ List<UnresolvedReferenceExpression> fields =
+ Arrays.asList(
+ unresolvedRef("field_tinyint"),
+ unresolvedRef("field_smallint"),
+ unresolvedRef("field_int"),
+ unresolvedRef("field_bigint"),
+ unresolvedRef("field_decimal"),
+ unresolvedRef("field_float"),
+ unresolvedRef("field_double"),
+ unresolvedRef("field_date"),
+ unresolvedRef("field_time"),
+ unresolvedRef("field_timestamp"),
+ unresolvedRef("field_varchar_equals_numeric"),
+ unresolvedRef("field_varchar_equals_date"),
+ unresolvedRef("field_varchar_equals_time"),
+ unresolvedRef("field_varchar_equals_timestamp"));
+
+ Table table = conversions.toTable(tEnv(),
JavaScalaConversionUtil.toScala(fields));
+ tEnv().registerTable("TestTable", table);
+
+ Table resultTable = tEnv().sqlQuery(sqlQuery);
+ DataStream result = tEnv().toAppendStream(resultTable, outputType);
+ TestingAppendRowDataSink sink = new
TestingAppendRowDataSink(outputType);
+ result.addSink(sink);
+ env().execute();
+
+ return new
ArrayList<>(JavaScalaConversionUtil.toJava(sink.getAppendResults()));
+ }
+
+ @Test
+ public void testNumericConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_tinyint, field_smallint, field_int,
field_bigint, "
+ + "field_decimal, field_float, field_double "
+ + "FROM TestTable WHERE "
+ + "field_tinyint = field_smallint AND "
+ + "field_tinyint = field_int AND "
+ + "field_tinyint = field_bigint AND "
+ + "field_tinyint = field_decimal AND "
+ + "field_tinyint = field_float AND "
+ + "field_tinyint = field_double AND "
+ + "field_smallint = field_int AND "
+ + "field_smallint = field_bigint AND "
+ + "field_smallint = field_decimal AND "
+ + "field_smallint = field_float AND "
+ + "field_smallint = field_double AND "
+ + "field_int = field_bigint AND "
+ + "field_int = field_decimal AND "
+ + "field_int = field_float AND "
+ + "field_int = field_double AND "
+ + "field_bigint = field_decimal AND "
+ + "field_bigint = field_float AND "
+ + "field_bigint = field_double AND "
+ + "field_decimal = field_float AND "
+ + "field_decimal = field_double AND "
+ + "field_float = field_double";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(
+ new TinyIntType(),
+ new SmallIntType(),
+ new IntType(),
+ new BigIntType(),
+ new DecimalType(),
+ new FloatType(),
+ new DoubleType());
+
+ List<String> expected = Arrays.asList("+I(1,1,1,1,1,1.0,1.0)");
+
+ List<String> actualResult =
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+ Collections.sort(expected);
+ Collections.sort(actualResult);
+ assertEquals(expected, actualResult);
+ }
+
+ @Test
+ public void testDateAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_date, field_varchar_equals_date FROM TestTable "
+ + "WHERE field_date = field_varchar_equals_date";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new DateType(), new VarCharType());
+
+ List<String> expected = Arrays.asList("+I(11323,2001-01-01)");
+
+ List<String> actualResult =
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+ Collections.sort(expected);
+ Collections.sort(actualResult);
+ assertEquals(expected, actualResult);
+ }
+
+ @Test
+ public void testTimeAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_time, field_varchar_equals_time FROM TestTable "
+ + "WHERE field_time = field_varchar_equals_time";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new TimeType(), new VarCharType());
+
+ List<String> expected = Arrays.asList("+I(0,00:00:00)");
+
+ List<String> actualResult =
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+ Collections.sort(expected);
+ Collections.sort(actualResult);
+ assertEquals(expected, actualResult);
+ }
+
+ @Test
+ public void testTimestampAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_timestamp, field_varchar_equals_timestamp FROM
TestTable "
+ + "WHERE field_timestamp =
field_varchar_equals_timestamp";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new TimestampType(), new
VarCharType());
+
+ List<String> expected = Arrays.asList("+I(2001-01-01T00:00,2001-01-01
00:00:00)");
+
+ List<String> actualResult =
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+ Collections.sort(expected);
+ Collections.sort(actualResult);
+ assertEquals(expected, actualResult);
+ }
+
+ private String getFilterAndProjectionExceptionMessage(List<String> types) {
+ return String.format(
+ "implicit type conversion between " + "%s and %s" + " is not
supported now",
+ types.get(0), types.get(1));
+ }
+
+ private void testSingleTableInvalidImplicitConversionTypes(
+ String sqlQuery, InternalTypeInfo<RowData> outputType,
List<String> types) {
+ expectedException().expect(CodeGenException.class);
+
expectedException().expectMessage(getFilterAndProjectionExceptionMessage(types));
+ testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+ }
+
+ @Test
+ public void testInvalidTinyIntAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_tinyint, field_varchar_equals_numeric FROM
TestTable "
+ + "WHERE field_tinyint = field_varchar_equals_numeric";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new TinyIntType(), new
VarCharType());
+
+ testSingleTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("TINYINT", "VARCHAR"));
+ }
+
+ @Test
+ public void testInvalidSmallIntAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_smallint, field_varchar_equals_numeric FROM
TestTable "
+ + "WHERE field_smallint =
field_varchar_equals_numeric";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new SmallIntType(), new
VarCharType());
+
+ testSingleTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("SMALLINT", "VARCHAR"));
+ }
+
+ @Test
+ public void testInvalidIntAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_int, field_varchar_equals_numeric FROM TestTable
"
+ + "WHERE field_int = field_varchar_equals_numeric";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new IntType(), new VarCharType());
+
+ testSingleTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("INTEGER", "VARCHAR"));
+ }
+
+ @Test
+ public void testInvalidBigIntAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_bigint, field_varchar_equals_numeric FROM
TestTable "
+ + "WHERE field_bigint = field_varchar_equals_numeric";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new BigIntType(), new VarCharType());
+
+ testSingleTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("BIGINT", "VARCHAR"));
+ }
+
+ @Test
+ public void testInvalidDecimalAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_decimal, field_varchar_equals_numeric FROM
TestTable "
+ + "WHERE field_decimal = field_varchar_equals_numeric";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new DecimalType(), new
VarCharType());
+
+ testSingleTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("DECIMAL", "VARCHAR"));
+ }
+
+ @Test
+ public void testInvalidFloatAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_float, field_varchar_equals_numeric FROM
TestTable "
+ + "WHERE field_float = field_varchar_equals_numeric";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new FloatType(), new VarCharType());
+
+ testSingleTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("FLOAT", "VARCHAR"));
+ }
+
+ @Test
+ public void testInvalidDoubleAndVarCharConversionInFilter() {
+ String sqlQuery =
+ "SELECT field_double, field_varchar_equals_numeric FROM
TestTable "
+ + "WHERE field_double = field_varchar_equals_numeric";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new DoubleType(), new VarCharType());
+
+ testSingleTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("DOUBLE", "VARCHAR"));
+ }
+
+ @Test
+ public void testFloatAndDoubleConversionInProjection() {
+ String sqlQuery =
+ "SELECT field_float, field_double, field_float = field_double
FROM TestTable";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new FloatType(), new DoubleType(),
new BooleanType());
+
+ List<String> expected = Arrays.asList("+I(1.0,1.0,true)");
+
+ List<String> actualResult =
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+ Collections.sort(expected);
+ Collections.sort(actualResult);
+ assertEquals(expected, actualResult);
+ }
+
+ @Test
+ public void testDateAndVarCharConversionInProjection() {
+ String sqlQuery =
+ "SELECT field_date, field_varchar_equals_date, "
+ + "field_date = field_varchar_equals_date FROM
TestTable";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new DateType(), new VarCharType(),
new BooleanType());
+
+ List<String> expected = Arrays.asList("+I(11323,2001-01-01,true)");
+
+ List<String> actualResult =
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+ Collections.sort(expected);
+ Collections.sort(actualResult);
+ assertEquals(expected, actualResult);
+ }
+
+ @Test
+ public void testInvalidDecimalAndVarCharConversionInProjection() {
+ String sqlQuery =
+ "SELECT field_decimal, field_varchar_equals_numeric, "
+ + "field_decimal = field_varchar_equals_numeric FROM
TestTable";
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new DecimalType(), new
VarCharType(), new BooleanType());
+
+ testSingleTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("DECIMAL", "VARCHAR"));
+ }
+
+ private void registerTableA() {
+ GenericRowData rowDataA = new GenericRowData(6);
+ rowDataA.setField(0, 1);
+ rowDataA.setField(1, 1);
+ rowDataA.setField(2, 1);
+ int date = (int) LocalDate.parse("2001-01-01").toEpochDay();
+ rowDataA.setField(3, date);
+ int time = (int) (LocalTime.parse("00:00:00").toNanoOfDay() /
1000000L);
+ rowDataA.setField(4, time);
+ TimestampData timestamp =
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2001-01-01T00:00:00"));
+ rowDataA.setField(5, timestamp);
+ List<RowData> dataA = Arrays.asList(rowDataA);
+
+ TypeInformation<RowData> tpeA =
+ InternalTypeInfo.ofFields(
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new DateType(),
+ new TimeType(),
+ new TimestampType());
+
+ DataStream dsA =
env().fromCollection(JavaScalaConversionUtil.toScala(dataA), tpeA);
+ DataStreamConversions conversions = new DataStreamConversions(dsA);
+
+ List<UnresolvedReferenceExpression> fields =
+ Arrays.asList(
+ unresolvedRef("a1"),
+ unresolvedRef("a2"),
+ unresolvedRef("a3"),
+ unresolvedRef("a4"),
+ unresolvedRef("a5"),
+ unresolvedRef("a6"));
+
+ Table tableA = conversions.toTable(tEnv(),
JavaScalaConversionUtil.toScala(fields));
+
+ tEnv().registerTable("A", tableA);
+ }
+
+ private void registerTableB() {
+ GenericRowData rowDataB = new GenericRowData(6);
+ rowDataB.setField(0, 1);
+ rowDataB.setField(1, (long) 1);
+ rowDataB.setField(2, StringData.fromString("1"));
+ rowDataB.setField(3, StringData.fromString("2001-01-01"));
+ rowDataB.setField(4, StringData.fromString("00:00:00"));
+ rowDataB.setField(5, StringData.fromString("2001-01-01 00:00:00"));
+ List<RowData> dataB = Arrays.asList(rowDataB);
+
+ TypeInformation<RowData> tpeB =
+ InternalTypeInfo.ofFields(
+ new IntType(),
+ new BigIntType(),
+ new VarCharType(),
+ new VarCharType(),
+ new VarCharType(),
+ new VarCharType());
+
+ DataStream dsB =
env().fromCollection(JavaScalaConversionUtil.toScala(dataB), tpeB);
+ DataStreamConversions conversions = new DataStreamConversions(dsB);
+
+ List<UnresolvedReferenceExpression> fields =
+ Arrays.asList(
+ unresolvedRef("b1"),
+ unresolvedRef("b2"),
+ unresolvedRef("b3"),
+ unresolvedRef("b4"),
+ unresolvedRef("b5"),
+ unresolvedRef("b6"));
+
+ Table tableB = conversions.toTable(tEnv(),
JavaScalaConversionUtil.toScala(fields));
+
+ tEnv().registerTable("B", tableB);
+ }
+
+ private void testTwoTableJoinSqlQuery(String sqlQuery,
InternalTypeInfo<RowData> outputType) {
+ registerTableA();
+ registerTableB();
+
+ Table resultTable = tEnv().sqlQuery(sqlQuery);
+ DataStream result = tEnv().toAppendStream(resultTable, outputType);
+ TestingAppendRowDataSink sink = new
TestingAppendRowDataSink(outputType);
+ result.addSink(sink);
+ env().execute();
+
+ List<String> expected = Arrays.asList("+I(1,1)");
+
+ List<String> actualResult =
+ new
ArrayList<>(JavaScalaConversionUtil.toJava(sink.getAppendResults()));
+
+ Collections.sort(expected);
+ Collections.sort(actualResult);
+ assertEquals(expected, actualResult);
+ }
+
+ private void testTwoTableInvalidImplicitConversionTypes(
+ String sqlQuery, InternalTypeInfo<RowData> outputType,
List<String> types) {
+ expectedException().expect(TableException.class);
+ expectedException().expectMessage(getJoinOnExceptionMessage(types));
+ testTwoTableJoinSqlQuery(sqlQuery, outputType);
+ }
+
+ private String getJoinOnExceptionMessage(List<String> types) {
+ return String.format(
+ "implicit type conversion between "
+ + "%s and %s"
+ + " is not supported on join's condition now",
+ types.get(0), types.get(1));
+ }
+
+ @Test
+ public void testIntAndBigIntConversionInJoinOn() {
+ String sqlQuery = "SELECT a1, b1 from A join B on a2 = b2";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+ testTwoTableJoinSqlQuery(sqlQuery, outputType);
+ }
+
+ @Test
+ public void testInvalidIntAndVarCharConversionInJoinOn() {
+ String sqlQuery = "SELECT a1, b1 from A join B on a3 = b3";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+ testTwoTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("INTEGER", "VARCHAR(1)"));
+ }
+
+ @Test
+ public void testInvalidDateAndVarCharConversionInJoinOn() {
+ String sqlQuery = "SELECT a1, b1 from A join B on a4 = b4";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+ testTwoTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("DATE", "VARCHAR(1)"));
+ }
+
+ @Test
+ public void testInvalidTimeAndVarCharConversionInJoinOn() {
+ String sqlQuery = "SELECT a1, b1 from A join B on a5 = b5";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+ testTwoTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("TIME(0)", "VARCHAR(1)"));
+ }
+
+ @Test
+ public void testInvalidTimestampAndVarCharConversionInJoinOn() {
+ String sqlQuery = "SELECT a1, b1 from A join B on a6 = b6";
+
+ InternalTypeInfo<RowData> outputType =
+ InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+ testTwoTableInvalidImplicitConversionTypes(
+ sqlQuery, outputType, Arrays.asList("TIMESTAMP(6)",
"VARCHAR(1)"));
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 9c9cefa..d1cf13b 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.expressions
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.planner.codegen.CodeGenException
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row
@@ -76,6 +77,163 @@ class SqlExpressionTest extends ExpressionTestBase {
}
@Test
+ def testValidImplicitTypeConversion: Unit = {
+ // implicit type conversion between tinyint and others
+ testSqlApi("cast(1 as tinyint) = cast(1 as smallint)","true")
+ testSqlApi("cast(1 as tinyint) = cast(1 as int)","true")
+ testSqlApi("cast(1 as tinyint) = cast(1 as bigint)","true")
+ testSqlApi("cast(1 as tinyint) = cast(1 as decimal)","true")
+ testSqlApi("cast(1 as tinyint) = cast(1 as float)","true")
+ testSqlApi("cast(1 as tinyint) = cast(1 as double)","true")
+ testSqlApi("cast(1 as tinyint) = cast(2 as int)","false")
+ testSqlApi("cast(1 as tinyint) = cast(2 as bigint)","false")
+ testSqlApi("cast(1 as tinyint) = cast(2 as decimal)","false")
+ testSqlApi("cast(1 as tinyint) = cast(2.1 as float)","false")
+ testSqlApi("cast(1 as tinyint) = cast(2.1 as double)","false")
+
+ // implicit type conversion between smallint and others
+ testSqlApi("cast(1 as smallint) = cast(1 as int)","true")
+ testSqlApi("cast(1 as smallint) = cast(1 as bigint)","true")
+ testSqlApi("cast(1 as smallint) = cast(1 as decimal)","true")
+ testSqlApi("cast(1 as smallint) = cast(1 as float)","true")
+ testSqlApi("cast(1 as smallint) = cast(1 as double)","true")
+ testSqlApi("cast(1 as smallint) = cast(2 as int)","false")
+ testSqlApi("cast(1 as smallint) = cast(2 as bigint)","false")
+ testSqlApi("cast(1 as smallint) = cast(2 as decimal)","false")
+ testSqlApi("cast(1 as smallint) = cast(2.1 as float)","false")
+ testSqlApi("cast(1 as smallint) = cast(2.1 as double)","false")
+
+ // implicit type conversion between int and others
+ testSqlApi("cast(1 as int) = cast(1 as bigint)","true")
+ testSqlApi("cast(1 as int) = cast(1 as decimal)","true")
+ testSqlApi("cast(1 as int) = cast(1 as float)","true")
+ testSqlApi("cast(1 as int) = cast(1 as double)","true")
+ testSqlApi("cast(1 as int) = cast(2 as bigint)","false")
+ testSqlApi("cast(1 as int) = cast(2 as decimal)","false")
+ testSqlApi("cast(1 as int) = cast(2.1 as float)","false")
+ testSqlApi("cast(1 as int) = cast(2.1 as double)","false")
+
+ // implicit type conversion between bigint and others
+ testSqlApi("cast(1 as bigint) = cast(1 as decimal)","true")
+ testSqlApi("cast(1 as bigint) = cast(1 as float)","true")
+ testSqlApi("cast(1 as bigint) = cast(1 as double)","true")
+ testSqlApi("cast(1 as bigint) = cast(2 as decimal)","false")
+ testSqlApi("cast(1 as bigint) = cast(2.1 as float)","false")
+ testSqlApi("cast(1 as bigint) = cast(2.1 as double)","false")
+
+ // implicit type conversion between decimal and others
+ testSqlApi("cast(1 as decimal) = cast(1 as float)","true")
+ testSqlApi("cast(1 as decimal) = cast(1 as double)","true")
+ testSqlApi("cast(1 as decimal) = cast(2.1 as float)","false")
+ testSqlApi("cast(1 as decimal) = cast(2.1 as double)","false")
+
+ // implicit type conversion between float and others
+ testSqlApi("cast(1 as float) = cast(1 as double)","true")
+ testSqlApi("cast(1 as float) = cast(2.1 as double)","false")
+
+ // implicit type conversion between date and varchar
+ testSqlApi("cast('2001-01-01' as date) = cast('2001-01-01' as
varchar)","true")
+ testSqlApi("cast('2001-01-01' as date) = cast('2001-01-02' as
varchar)","false")
+
+ // implicit type conversion between date and char
+ testSqlApi("cast('2001-01-01' as date) = cast('2001-01-01' as
char)","true")
+ testSqlApi("cast('2001-01-01' as date) = cast('2001-01-02' as
char)","false")
+
+ // implicit type conversion between time and char
+ testSqlApi("cast('00:00:00.000000000' as time) = cast('00:00:00.000000000'
as varchar)","true")
+ testSqlApi("cast('00:00:00.000000000' as time) = cast('00:00:01.000000000'
as varchar)","false")
+
+ // implicit type conversion between time and char
+ testSqlApi("cast('2001-01-01 12:12:12.123' as timestamp)" +
+ " = cast('2001-01-01 12:12:12.1230' as varchar)","true")
+ testSqlApi("cast('1990-10-14 12:12:12.123' as timestamp)" +
+ " = cast('1990-10-15 12:12:12.123' as varchar)","false")
+ }
+
+ @Test
+ def testInValidImplicitTypeConversion: Unit = {
+ val typeChar = "CHAR"
+ val typeVarChar = "VARCHAR"
+
+ val typeTinyInt = "TINYINT"
+ val typeSmallInt = "SMALLINT"
+ val typeInt = "INTEGER"
+ val typeBigInt = "BIGINT"
+ val typeDecimal = "DECIMAL"
+ val typeFloat = "FLOAT"
+ val typeDouble = "DOUBLE"
+
+ // implicit type conversion between char and others
+ testInvalidImplicitConversionTypes(
+ "'1' = cast(1 as tinyint)",
+ List(typeChar,typeTinyInt))
+
+ testInvalidImplicitConversionTypes(
+ "'1' = cast(1 as smallint)",
+ List(typeChar,typeSmallInt))
+
+ testInvalidImplicitConversionTypes(
+ "'1' = cast(1 as int)",
+ List(typeChar,typeInt))
+
+ testInvalidImplicitConversionTypes(
+ "'1' = cast(1 as bigint)",
+ List(typeChar,typeBigInt))
+
+ testInvalidImplicitConversionTypes(
+ "'1.1' = cast(1.1 as decimal(2, 1))",
+ List(typeChar,typeDecimal))
+
+ testInvalidImplicitConversionTypes(
+ "'1.1' = cast(1.1 as float)",
+ List(typeChar,typeFloat))
+
+ testInvalidImplicitConversionTypes(
+ "'1.1' = cast(1.1 as double)",
+ List(typeChar,typeDouble))
+
+ // implicit type conversion between varchar and others
+ testInvalidImplicitConversionTypes(
+ "cast('1' as varchar) = cast(1 as tinyint)",
+ List(typeVarChar,typeTinyInt))
+
+ testInvalidImplicitConversionTypes(
+ "cast('1' as varchar) = cast(1 as smallint)",
+ List(typeVarChar,typeSmallInt))
+
+ testInvalidImplicitConversionTypes(
+
+ "cast('1' as varchar) = cast(1 as int)",
+ List(typeVarChar,typeInt))
+
+ testInvalidImplicitConversionTypes(
+ "cast('1' as varchar) = cast(1 as bigint)",
+ List(typeVarChar,typeBigInt))
+
+ testInvalidImplicitConversionTypes(
+ "cast('1.1' as varchar) = cast(1.1 as decimal(2, 1))",
+ List(typeVarChar,typeDecimal))
+
+ testInvalidImplicitConversionTypes(
+ "cast('1.1' as varchar) = cast(1.1 as float)",
+ List(typeVarChar,typeFloat))
+
+ testInvalidImplicitConversionTypes(
+ "cast('1.1' as varchar) = cast(1.1 as double)",
+ List(typeVarChar,typeDouble))
+ }
+
+ private def testInvalidImplicitConversionTypes(sql: String, types:
List[String]): Unit ={
+ val expectedExceptionMessage =
+ "implicit type conversion between " +
+ s"${types(0)}" +
+ s" and " +
+ s"${types(1)}" +
+ s" is not supported now"
+ testExpectedSqlException(sql, expectedExceptionMessage,
classOf[CodeGenException])
+ }
+
+ @Test
def testLogicalFunctions(): Unit = {
testSqlApi("TRUE OR FALSE", "true")
testSqlApi("TRUE AND FALSE", "false")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
index 32cd9e1..3dc3e7d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
@@ -201,7 +201,9 @@ class LookupJoinTest(legacyTableSource: Boolean) extends
TableTestBase {
testUtil.replaceBatchProgram(programs)
thrown.expect(classOf[TableException])
- thrown.expectMessage("VARCHAR(2147483647) and INTEGER does not have common
type now")
+ thrown.expectMessage(
+ "implicit type conversion between VARCHAR(2147483647) and INTEGER " +
+ "is not supported on join's condition now")
testUtil.verifyRelPlan("SELECT * FROM MyTable AS T JOIN LookupTable "
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 64addab..82d8f31 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -18,11 +18,6 @@
package org.apache.flink.table.planner.plan.stream.sql.join
-import _root_.java.lang.{Boolean => JBoolean}
-import _root_.java.sql.Timestamp
-import _root_.java.util
-import _root_.java.util.{ArrayList => JArrayList, Collection => JCollection,
HashMap => JHashMap, List => JList, Map => JMap}
-
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
@@ -40,11 +35,17 @@ import org.apache.flink.table.planner.utils.TableTestBase
import org.apache.flink.table.sources._
import org.apache.flink.table.types.DataType
import org.apache.flink.table.utils.EncodingUtils
+
import org.junit.Assert.{assertThat, assertTrue, fail}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{Assume, Before, Test}
+import _root_.java.lang.{Boolean => JBoolean}
+import _root_.java.sql.Timestamp
+import _root_.java.util
+import _root_.java.util.{ArrayList => JArrayList, Collection => JCollection,
HashMap => JHashMap, List => JList, Map => JMap}
+
import _root_.scala.collection.JavaConversions._
/**
@@ -257,7 +258,9 @@ class LookupJoinTest(legacyTableSource: Boolean) extends
TableTestBase with Seri
def testJoinOnDifferentKeyTypes(): Unit = {
// Will do implicit type coercion.
thrown.expect(classOf[TableException])
- thrown.expectMessage("VARCHAR(2147483647) and INTEGER does not have common
type now")
+ thrown.expectMessage(
+ "implicit type conversion between VARCHAR(2147483647) and INTEGER " +
+ "is not supported on join's condition now")
util.verifyExecPlan("SELECT * FROM MyTable AS T JOIN LookupTable "
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id")
}