This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 25dc6ea  [FLINK-24492][table-planner] Fix incorrect implicit type 
conversion between numeric and (var)char
25dc6ea is described below

commit 25dc6eacd38bd7a70883fc8e54f0bec8dc5861c0
Author: xuyang <[email protected]>
AuthorDate: Sat Oct 9 18:08:47 2021 +0800

    [FLINK-24492][table-planner] Fix incorrect implicit type conversion between 
numeric and (var)char
    
    This closes #17444
---
 .../planner/codegen/calls/ScalarOperatorGens.scala |  22 +
 .../logical/JoinConditionTypeCoerceRule.scala      |   4 +-
 .../stream/sql/ImplicitTypeConversionITCase.java   | 552 +++++++++++++++++++++
 .../planner/expressions/SqlExpressionTest.scala    | 158 ++++++
 .../plan/batch/sql/join/LookupJoinTest.scala       |   4 +-
 .../plan/stream/sql/join/LookupJoinTest.scala      |  15 +-
 6 files changed, 747 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 6e7d6a5..60ebc1e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -413,11 +413,32 @@ object ScalarOperatorGens {
     }
   }
 
+  /**
+   * check the validity of implicit type conversion
+   * See: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-154%3A+SQL+Implicit+Type+Coercion
+   */
+  private def checkImplicitConversionValidity(
+      left: GeneratedExpression,
+      right: GeneratedExpression): Unit = {
+    // TODO: in flip-154, we should support implicit type conversion between 
(var)char and numeric,
+    // but flink has not yet supported now
+    if ((isNumeric(left.resultType) && isCharacterString(right.resultType))
+      || (isNumeric(right.resultType) && isCharacterString(left.resultType))) {
+      throw new CodeGenException(
+        "implicit type conversion between " +
+          s"${left.resultType.getTypeRoot}" +
+          s" and " +
+          s"${right.resultType.getTypeRoot}" +
+          s" is not supported now")
+    }
+  }
+
   def generateEquals(
       ctx: CodeGeneratorContext,
       left: GeneratedExpression,
       right: GeneratedExpression)
     : GeneratedExpression = {
+    checkImplicitConversionValidity(left,right)
     val canEqual = isInteroperable(left.resultType, right.resultType)
     if (isCharacterString(left.resultType) && 
isCharacterString(right.resultType)) {
       generateOperatorIfNotNull(ctx, new BooleanType(), left, right) {
@@ -517,6 +538,7 @@ object ScalarOperatorGens {
       left: GeneratedExpression,
       right: GeneratedExpression)
     : GeneratedExpression = {
+    checkImplicitConversionValidity(left,right)
     if (isCharacterString(left.resultType) && 
isCharacterString(right.resultType)) {
       generateOperatorIfNotNull(ctx, new BooleanType(), left, right) {
         (leftTerm, rightTerm) => s"!$leftTerm.equals($rightTerm)"
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
index ff00dd2..4c888f8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
@@ -74,7 +74,9 @@ class JoinConditionTypeCoerceRule extends RelOptRule(
             val targetType = typeFactory.leastRestrictive(refList.map(ref => 
ref.getType))
             if (targetType == null) {
               throw new TableException(
-                s"${ref1.getType} and ${ref2.getType} does not have common 
type now")
+                s"implicit type conversion between" +
+                s" ${ref1.getType} and ${ref2.getType} " +
+                s"is not supported on join's condition now")
             }
             newJoinFilters += builder.equals(
               rexBuilder.ensureType(targetType, ref1, true),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
new file mode 100644
index 0000000..1739a7b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.scala.DataStreamConversions;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.planner.codegen.CodeGenException;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestingAppendRowDataSink;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.LegacyRowResource;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.junit.Assert.assertEquals;
+
+/** test implicit type conversion between different types. */
+public class ImplicitTypeConversionITCase extends StreamingTestBase {
+
+    @Rule public LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
+
+    private List<String> testSingleTableSqlQueryWithOutputType(
+            String sqlQuery, InternalTypeInfo<RowData> outputType) {
+        GenericRowData rowData = new GenericRowData(14);
+        rowData.setField(0, (byte) 1);
+        rowData.setField(1, (short) 1);
+        rowData.setField(2, 1);
+        rowData.setField(3, (long) 1);
+        rowData.setField(4, DecimalDataUtils.castFrom(1, 1, 0));
+        rowData.setField(5, (float) 1);
+        rowData.setField(6, (double) 1);
+        int date = (int) LocalDate.parse("2001-01-01").toEpochDay();
+        rowData.setField(7, date);
+        int time = (int) (LocalTime.parse("00:00:00").toNanoOfDay() / 
1000000L);
+        rowData.setField(8, time);
+        TimestampData timestamp =
+                
TimestampData.fromLocalDateTime(LocalDateTime.parse("2001-01-01T00:00:00"));
+        rowData.setField(9, timestamp);
+        rowData.setField(10, StringData.fromString("1"));
+        rowData.setField(11, StringData.fromString("2001-01-01"));
+        rowData.setField(12, StringData.fromString("00:00:00"));
+        rowData.setField(13, StringData.fromString("2001-01-01 00:00:00"));
+        List data = Arrays.asList(rowData);
+
+        TypeInformation<RowData> tpe =
+                InternalTypeInfo.ofFields(
+                        new TinyIntType(),
+                        new SmallIntType(),
+                        new IntType(),
+                        new BigIntType(),
+                        new DecimalType(1, 0),
+                        new FloatType(),
+                        new DoubleType(),
+                        new DateType(),
+                        new TimeType(),
+                        new TimestampType(),
+                        new VarCharType(),
+                        new VarCharType(),
+                        new VarCharType(),
+                        new VarCharType());
+
+        DataStream ds = 
env().fromCollection(JavaScalaConversionUtil.toScala(data), tpe);
+        DataStreamConversions conversions = new DataStreamConversions(ds);
+
+        List<UnresolvedReferenceExpression> fields =
+                Arrays.asList(
+                        unresolvedRef("field_tinyint"),
+                        unresolvedRef("field_smallint"),
+                        unresolvedRef("field_int"),
+                        unresolvedRef("field_bigint"),
+                        unresolvedRef("field_decimal"),
+                        unresolvedRef("field_float"),
+                        unresolvedRef("field_double"),
+                        unresolvedRef("field_date"),
+                        unresolvedRef("field_time"),
+                        unresolvedRef("field_timestamp"),
+                        unresolvedRef("field_varchar_equals_numeric"),
+                        unresolvedRef("field_varchar_equals_date"),
+                        unresolvedRef("field_varchar_equals_time"),
+                        unresolvedRef("field_varchar_equals_timestamp"));
+
+        Table table = conversions.toTable(tEnv(), 
JavaScalaConversionUtil.toScala(fields));
+        tEnv().registerTable("TestTable", table);
+
+        Table resultTable = tEnv().sqlQuery(sqlQuery);
+        DataStream result = tEnv().toAppendStream(resultTable, outputType);
+        TestingAppendRowDataSink sink = new 
TestingAppendRowDataSink(outputType);
+        result.addSink(sink);
+        env().execute();
+
+        return new 
ArrayList<>(JavaScalaConversionUtil.toJava(sink.getAppendResults()));
+    }
+
+    @Test
+    public void testNumericConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_tinyint, field_smallint, field_int, 
field_bigint, "
+                        + "field_decimal, field_float, field_double "
+                        + "FROM TestTable WHERE "
+                        + "field_tinyint = field_smallint AND "
+                        + "field_tinyint = field_int AND "
+                        + "field_tinyint = field_bigint AND "
+                        + "field_tinyint = field_decimal AND "
+                        + "field_tinyint = field_float AND "
+                        + "field_tinyint = field_double AND "
+                        + "field_smallint = field_int AND "
+                        + "field_smallint = field_bigint AND "
+                        + "field_smallint = field_decimal AND "
+                        + "field_smallint = field_float AND "
+                        + "field_smallint = field_double AND "
+                        + "field_int = field_bigint AND "
+                        + "field_int = field_decimal AND "
+                        + "field_int = field_float AND "
+                        + "field_int = field_double AND "
+                        + "field_bigint = field_decimal AND "
+                        + "field_bigint = field_float AND "
+                        + "field_bigint = field_double AND "
+                        + "field_decimal = field_float AND "
+                        + "field_decimal = field_double AND "
+                        + "field_float = field_double";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(
+                        new TinyIntType(),
+                        new SmallIntType(),
+                        new IntType(),
+                        new BigIntType(),
+                        new DecimalType(),
+                        new FloatType(),
+                        new DoubleType());
+
+        List<String> expected = Arrays.asList("+I(1,1,1,1,1,1.0,1.0)");
+
+        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+        Collections.sort(expected);
+        Collections.sort(actualResult);
+        assertEquals(expected, actualResult);
+    }
+
+    @Test
+    public void testDateAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_date, field_varchar_equals_date FROM TestTable "
+                        + "WHERE field_date = field_varchar_equals_date";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new DateType(), new VarCharType());
+
+        List<String> expected = Arrays.asList("+I(11323,2001-01-01)");
+
+        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+        Collections.sort(expected);
+        Collections.sort(actualResult);
+        assertEquals(expected, actualResult);
+    }
+
+    @Test
+    public void testTimeAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_time, field_varchar_equals_time FROM TestTable "
+                        + "WHERE field_time = field_varchar_equals_time";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new TimeType(), new VarCharType());
+
+        List<String> expected = Arrays.asList("+I(0,00:00:00)");
+
+        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+        Collections.sort(expected);
+        Collections.sort(actualResult);
+        assertEquals(expected, actualResult);
+    }
+
+    @Test
+    public void testTimestampAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_timestamp, field_varchar_equals_timestamp FROM 
TestTable "
+                        + "WHERE field_timestamp = 
field_varchar_equals_timestamp";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new TimestampType(), new 
VarCharType());
+
+        List<String> expected = Arrays.asList("+I(2001-01-01T00:00,2001-01-01 
00:00:00)");
+
+        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+        Collections.sort(expected);
+        Collections.sort(actualResult);
+        assertEquals(expected, actualResult);
+    }
+
+    private String getFilterAndProjectionExceptionMessage(List<String> types) {
+        return String.format(
+                "implicit type conversion between " + "%s and %s" + " is not 
supported now",
+                types.get(0), types.get(1));
+    }
+
+    private void testSingleTableInvalidImplicitConversionTypes(
+            String sqlQuery, InternalTypeInfo<RowData> outputType, 
List<String> types) {
+        expectedException().expect(CodeGenException.class);
+        
expectedException().expectMessage(getFilterAndProjectionExceptionMessage(types));
+        testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+    }
+
+    @Test
+    public void testInvalidTinyIntAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_tinyint, field_varchar_equals_numeric FROM 
TestTable "
+                        + "WHERE field_tinyint = field_varchar_equals_numeric";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new TinyIntType(), new 
VarCharType());
+
+        testSingleTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("TINYINT", "VARCHAR"));
+    }
+
+    @Test
+    public void testInvalidSmallIntAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_smallint, field_varchar_equals_numeric FROM 
TestTable "
+                        + "WHERE field_smallint = 
field_varchar_equals_numeric";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new SmallIntType(), new 
VarCharType());
+
+        testSingleTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("SMALLINT", "VARCHAR"));
+    }
+
+    @Test
+    public void testInvalidIntAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_int, field_varchar_equals_numeric FROM TestTable 
"
+                        + "WHERE field_int = field_varchar_equals_numeric";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new IntType(), new VarCharType());
+
+        testSingleTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("INTEGER", "VARCHAR"));
+    }
+
+    @Test
+    public void testInvalidBigIntAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_bigint, field_varchar_equals_numeric FROM 
TestTable "
+                        + "WHERE field_bigint = field_varchar_equals_numeric";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new BigIntType(), new VarCharType());
+
+        testSingleTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("BIGINT", "VARCHAR"));
+    }
+
+    @Test
+    public void testInvalidDecimalAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_decimal, field_varchar_equals_numeric FROM 
TestTable "
+                        + "WHERE field_decimal = field_varchar_equals_numeric";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new DecimalType(), new 
VarCharType());
+
+        testSingleTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("DECIMAL", "VARCHAR"));
+    }
+
+    @Test
+    public void testInvalidFloatAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_float, field_varchar_equals_numeric FROM 
TestTable "
+                        + "WHERE field_float = field_varchar_equals_numeric";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new FloatType(), new VarCharType());
+
+        testSingleTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("FLOAT", "VARCHAR"));
+    }
+
+    @Test
+    public void testInvalidDoubleAndVarCharConversionInFilter() {
+        String sqlQuery =
+                "SELECT field_double, field_varchar_equals_numeric FROM 
TestTable "
+                        + "WHERE field_double = field_varchar_equals_numeric";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new DoubleType(), new VarCharType());
+
+        testSingleTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("DOUBLE", "VARCHAR"));
+    }
+
+    @Test
+    public void testFloatAndDoubleConversionInProjection() {
+        String sqlQuery =
+                "SELECT field_float, field_double, field_float = field_double 
FROM TestTable";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new FloatType(), new DoubleType(), 
new BooleanType());
+
+        List<String> expected = Arrays.asList("+I(1.0,1.0,true)");
+
+        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+        Collections.sort(expected);
+        Collections.sort(actualResult);
+        assertEquals(expected, actualResult);
+    }
+
+    @Test
+    public void testDateAndVarCharConversionInProjection() {
+        String sqlQuery =
+                "SELECT field_date, field_varchar_equals_date, "
+                        + "field_date = field_varchar_equals_date FROM 
TestTable";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new DateType(), new VarCharType(), 
new BooleanType());
+
+        List<String> expected = Arrays.asList("+I(11323,2001-01-01,true)");
+
+        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
+        Collections.sort(expected);
+        Collections.sort(actualResult);
+        assertEquals(expected, actualResult);
+    }
+
+    @Test
+    public void testInvalidDecimalAndVarCharConversionInProjection() {
+        String sqlQuery =
+                "SELECT field_decimal, field_varchar_equals_numeric, "
+                        + "field_decimal = field_varchar_equals_numeric FROM 
TestTable";
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new DecimalType(), new 
VarCharType(), new BooleanType());
+
+        testSingleTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("DECIMAL", "VARCHAR"));
+    }
+
+    private void registerTableA() {
+        GenericRowData rowDataA = new GenericRowData(6);
+        rowDataA.setField(0, 1);
+        rowDataA.setField(1, 1);
+        rowDataA.setField(2, 1);
+        int date = (int) LocalDate.parse("2001-01-01").toEpochDay();
+        rowDataA.setField(3, date);
+        int time = (int) (LocalTime.parse("00:00:00").toNanoOfDay() / 
1000000L);
+        rowDataA.setField(4, time);
+        TimestampData timestamp =
+                
TimestampData.fromLocalDateTime(LocalDateTime.parse("2001-01-01T00:00:00"));
+        rowDataA.setField(5, timestamp);
+        List<RowData> dataA = Arrays.asList(rowDataA);
+
+        TypeInformation<RowData> tpeA =
+                InternalTypeInfo.ofFields(
+                        new IntType(),
+                        new IntType(),
+                        new IntType(),
+                        new DateType(),
+                        new TimeType(),
+                        new TimestampType());
+
+        DataStream dsA = 
env().fromCollection(JavaScalaConversionUtil.toScala(dataA), tpeA);
+        DataStreamConversions conversions = new DataStreamConversions(dsA);
+
+        List<UnresolvedReferenceExpression> fields =
+                Arrays.asList(
+                        unresolvedRef("a1"),
+                        unresolvedRef("a2"),
+                        unresolvedRef("a3"),
+                        unresolvedRef("a4"),
+                        unresolvedRef("a5"),
+                        unresolvedRef("a6"));
+
+        Table tableA = conversions.toTable(tEnv(), 
JavaScalaConversionUtil.toScala(fields));
+
+        tEnv().registerTable("A", tableA);
+    }
+
+    private void registerTableB() {
+        GenericRowData rowDataB = new GenericRowData(6);
+        rowDataB.setField(0, 1);
+        rowDataB.setField(1, (long) 1);
+        rowDataB.setField(2, StringData.fromString("1"));
+        rowDataB.setField(3, StringData.fromString("2001-01-01"));
+        rowDataB.setField(4, StringData.fromString("00:00:00"));
+        rowDataB.setField(5, StringData.fromString("2001-01-01 00:00:00"));
+        List<RowData> dataB = Arrays.asList(rowDataB);
+
+        TypeInformation<RowData> tpeB =
+                InternalTypeInfo.ofFields(
+                        new IntType(),
+                        new BigIntType(),
+                        new VarCharType(),
+                        new VarCharType(),
+                        new VarCharType(),
+                        new VarCharType());
+
+        DataStream dsB = 
env().fromCollection(JavaScalaConversionUtil.toScala(dataB), tpeB);
+        DataStreamConversions conversions = new DataStreamConversions(dsB);
+
+        List<UnresolvedReferenceExpression> fields =
+                Arrays.asList(
+                        unresolvedRef("b1"),
+                        unresolvedRef("b2"),
+                        unresolvedRef("b3"),
+                        unresolvedRef("b4"),
+                        unresolvedRef("b5"),
+                        unresolvedRef("b6"));
+
+        Table tableB = conversions.toTable(tEnv(), 
JavaScalaConversionUtil.toScala(fields));
+
+        tEnv().registerTable("B", tableB);
+    }
+
+    private void testTwoTableJoinSqlQuery(String sqlQuery, 
InternalTypeInfo<RowData> outputType) {
+        registerTableA();
+        registerTableB();
+
+        Table resultTable = tEnv().sqlQuery(sqlQuery);
+        DataStream result = tEnv().toAppendStream(resultTable, outputType);
+        TestingAppendRowDataSink sink = new 
TestingAppendRowDataSink(outputType);
+        result.addSink(sink);
+        env().execute();
+
+        List<String> expected = Arrays.asList("+I(1,1)");
+
+        List<String> actualResult =
+                new 
ArrayList<>(JavaScalaConversionUtil.toJava(sink.getAppendResults()));
+
+        Collections.sort(expected);
+        Collections.sort(actualResult);
+        assertEquals(expected, actualResult);
+    }
+
+    private void testTwoTableInvalidImplicitConversionTypes(
+            String sqlQuery, InternalTypeInfo<RowData> outputType, 
List<String> types) {
+        expectedException().expect(TableException.class);
+        expectedException().expectMessage(getJoinOnExceptionMessage(types));
+        testTwoTableJoinSqlQuery(sqlQuery, outputType);
+    }
+
+    private String getJoinOnExceptionMessage(List<String> types) {
+        return String.format(
+                "implicit type conversion between "
+                        + "%s and %s"
+                        + " is not supported on join's condition now",
+                types.get(0), types.get(1));
+    }
+
+    @Test
+    public void testIntAndBigIntConversionInJoinOn() {
+        String sqlQuery = "SELECT a1, b1 from A join B on a2 = b2";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+        testTwoTableJoinSqlQuery(sqlQuery, outputType);
+    }
+
+    @Test
+    public void testInvalidIntAndVarCharConversionInJoinOn() {
+        String sqlQuery = "SELECT a1, b1 from A join B on a3 = b3";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+        testTwoTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("INTEGER", "VARCHAR(1)"));
+    }
+
+    @Test
+    public void testInvalidDateAndVarCharConversionInJoinOn() {
+        String sqlQuery = "SELECT a1, b1 from A join B on a4 = b4";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+        testTwoTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("DATE", "VARCHAR(1)"));
+    }
+
+    @Test
+    public void testInvalidTimeAndVarCharConversionInJoinOn() {
+        String sqlQuery = "SELECT a1, b1 from A join B on a5 = b5";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+        testTwoTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("TIME(0)", "VARCHAR(1)"));
+    }
+
+    @Test
+    public void testInvalidTimestampAndVarCharConversionInJoinOn() {
+        String sqlQuery = "SELECT a1, b1 from A join B on a6 = b6";
+
+        InternalTypeInfo<RowData> outputType =
+                InternalTypeInfo.ofFields(new IntType(), new IntType());
+
+        testTwoTableInvalidImplicitConversionTypes(
+                sqlQuery, outputType, Arrays.asList("TIMESTAMP(6)", 
"VARCHAR(1)"));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 9c9cefa..d1cf13b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.planner.codegen.CodeGenException
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import org.apache.flink.types.Row
 
@@ -76,6 +77,163 @@ class SqlExpressionTest extends ExpressionTestBase {
   }
 
   @Test
+  def testValidImplicitTypeConversion: Unit = {
+    // implicit type conversion between tinyint and others
+    testSqlApi("cast(1 as tinyint) = cast(1 as smallint)","true")
+    testSqlApi("cast(1 as tinyint) = cast(1 as int)","true")
+    testSqlApi("cast(1 as tinyint) = cast(1 as bigint)","true")
+    testSqlApi("cast(1 as tinyint) = cast(1 as decimal)","true")
+    testSqlApi("cast(1 as tinyint) = cast(1 as float)","true")
+    testSqlApi("cast(1 as tinyint) = cast(1 as double)","true")
+    testSqlApi("cast(1 as tinyint) = cast(2 as int)","false")
+    testSqlApi("cast(1 as tinyint) = cast(2 as bigint)","false")
+    testSqlApi("cast(1 as tinyint) = cast(2 as decimal)","false")
+    testSqlApi("cast(1 as tinyint) = cast(2.1 as float)","false")
+    testSqlApi("cast(1 as tinyint) = cast(2.1 as double)","false")
+
+    // implicit type conversion between smallint and others
+    testSqlApi("cast(1 as smallint) = cast(1 as int)","true")
+    testSqlApi("cast(1 as smallint) = cast(1 as bigint)","true")
+    testSqlApi("cast(1 as smallint) = cast(1 as decimal)","true")
+    testSqlApi("cast(1 as smallint) = cast(1 as float)","true")
+    testSqlApi("cast(1 as smallint) = cast(1 as double)","true")
+    testSqlApi("cast(1 as smallint) = cast(2 as int)","false")
+    testSqlApi("cast(1 as smallint) = cast(2 as bigint)","false")
+    testSqlApi("cast(1 as smallint) = cast(2 as decimal)","false")
+    testSqlApi("cast(1 as smallint) = cast(2.1 as float)","false")
+    testSqlApi("cast(1 as smallint) = cast(2.1 as double)","false")
+
+    // implicit type conversion between int and others
+    testSqlApi("cast(1 as int) = cast(1 as bigint)","true")
+    testSqlApi("cast(1 as int) = cast(1 as decimal)","true")
+    testSqlApi("cast(1 as int) = cast(1 as float)","true")
+    testSqlApi("cast(1 as int) = cast(1 as double)","true")
+    testSqlApi("cast(1 as int) = cast(2 as bigint)","false")
+    testSqlApi("cast(1 as int) = cast(2 as decimal)","false")
+    testSqlApi("cast(1 as int) = cast(2.1 as float)","false")
+    testSqlApi("cast(1 as int) = cast(2.1 as double)","false")
+
+    // implicit type conversion between bigint and others
+    testSqlApi("cast(1 as bigint) = cast(1 as decimal)","true")
+    testSqlApi("cast(1 as bigint) = cast(1 as float)","true")
+    testSqlApi("cast(1 as bigint) = cast(1 as double)","true")
+    testSqlApi("cast(1 as bigint) = cast(2 as decimal)","false")
+    testSqlApi("cast(1 as bigint) = cast(2.1 as float)","false")
+    testSqlApi("cast(1 as bigint) = cast(2.1 as double)","false")
+
+    // implicit type conversion between decimal and others
+    testSqlApi("cast(1 as decimal) = cast(1 as float)","true")
+    testSqlApi("cast(1 as decimal) = cast(1 as double)","true")
+    testSqlApi("cast(1 as decimal) = cast(2.1 as float)","false")
+    testSqlApi("cast(1 as decimal) = cast(2.1 as double)","false")
+
+    // implicit type conversion between float and others
+    testSqlApi("cast(1 as float) = cast(1 as double)","true")
+    testSqlApi("cast(1 as float) = cast(2.1 as double)","false")
+
+    // implicit type conversion between date and varchar
+    testSqlApi("cast('2001-01-01' as date) = cast('2001-01-01' as 
varchar)","true")
+    testSqlApi("cast('2001-01-01' as date) = cast('2001-01-02' as 
varchar)","false")
+
+    // implicit type conversion between date and char
+    testSqlApi("cast('2001-01-01' as date) = cast('2001-01-01' as 
char)","true")
+    testSqlApi("cast('2001-01-01' as date) = cast('2001-01-02' as 
char)","false")
+
+    // implicit type conversion between time and char
+    testSqlApi("cast('00:00:00.000000000' as time) = cast('00:00:00.000000000' 
as varchar)","true")
+    testSqlApi("cast('00:00:00.000000000' as time) = cast('00:00:01.000000000' 
as varchar)","false")
+
+    // implicit type conversion between time and char
+    testSqlApi("cast('2001-01-01 12:12:12.123' as timestamp)" +
+      " = cast('2001-01-01 12:12:12.1230' as varchar)","true")
+    testSqlApi("cast('1990-10-14 12:12:12.123' as timestamp)" +
+      " = cast('1990-10-15 12:12:12.123' as varchar)","false")
+  }
+
+  @Test
+  def testInValidImplicitTypeConversion: Unit = {
+    val typeChar = "CHAR"
+    val typeVarChar = "VARCHAR"
+
+    val typeTinyInt = "TINYINT"
+    val typeSmallInt = "SMALLINT"
+    val typeInt = "INTEGER"
+    val typeBigInt = "BIGINT"
+    val typeDecimal = "DECIMAL"
+    val typeFloat = "FLOAT"
+    val typeDouble = "DOUBLE"
+
+    // implicit type conversion between char and others
+    testInvalidImplicitConversionTypes(
+      "'1' = cast(1 as tinyint)",
+      List(typeChar,typeTinyInt))
+
+    testInvalidImplicitConversionTypes(
+      "'1' = cast(1 as smallint)",
+      List(typeChar,typeSmallInt))
+
+    testInvalidImplicitConversionTypes(
+      "'1' = cast(1 as int)",
+      List(typeChar,typeInt))
+
+    testInvalidImplicitConversionTypes(
+      "'1' = cast(1 as bigint)",
+      List(typeChar,typeBigInt))
+
+    testInvalidImplicitConversionTypes(
+      "'1.1' = cast(1.1 as decimal(2, 1))",
+      List(typeChar,typeDecimal))
+
+    testInvalidImplicitConversionTypes(
+      "'1.1' = cast(1.1 as float)",
+      List(typeChar,typeFloat))
+
+    testInvalidImplicitConversionTypes(
+      "'1.1' = cast(1.1 as double)",
+      List(typeChar,typeDouble))
+
+    // implicit type conversion between varchar and others
+    testInvalidImplicitConversionTypes(
+      "cast('1' as varchar) = cast(1 as tinyint)",
+      List(typeVarChar,typeTinyInt))
+
+    testInvalidImplicitConversionTypes(
+      "cast('1' as varchar) = cast(1 as smallint)",
+      List(typeVarChar,typeSmallInt))
+
+    testInvalidImplicitConversionTypes(
+
+      "cast('1' as varchar) = cast(1 as int)",
+      List(typeVarChar,typeInt))
+
+    testInvalidImplicitConversionTypes(
+      "cast('1' as varchar) = cast(1 as bigint)",
+      List(typeVarChar,typeBigInt))
+
+    testInvalidImplicitConversionTypes(
+      "cast('1.1' as varchar) = cast(1.1 as decimal(2, 1))",
+      List(typeVarChar,typeDecimal))
+
+    testInvalidImplicitConversionTypes(
+      "cast('1.1' as varchar) = cast(1.1 as float)",
+      List(typeVarChar,typeFloat))
+
+    testInvalidImplicitConversionTypes(
+      "cast('1.1' as varchar) = cast(1.1 as double)",
+      List(typeVarChar,typeDouble))
+  }
+
+  private def testInvalidImplicitConversionTypes(sql: String, types: 
List[String]): Unit ={
+    val expectedExceptionMessage =
+      "implicit type conversion between " +
+      s"${types(0)}" +
+      s" and " +
+      s"${types(1)}" +
+      s" is not supported now"
+    testExpectedSqlException(sql, expectedExceptionMessage, 
classOf[CodeGenException])
+  }
+
+  @Test
   def testLogicalFunctions(): Unit = {
     testSqlApi("TRUE OR FALSE", "true")
     testSqlApi("TRUE AND FALSE", "false")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
index 32cd9e1..3dc3e7d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
@@ -201,7 +201,9 @@ class LookupJoinTest(legacyTableSource: Boolean) extends 
TableTestBase {
     testUtil.replaceBatchProgram(programs)
 
     thrown.expect(classOf[TableException])
-    thrown.expectMessage("VARCHAR(2147483647) and INTEGER does not have common 
type now")
+    thrown.expectMessage(
+      "implicit type conversion between VARCHAR(2147483647) and INTEGER " +
+        "is not supported on join's condition now")
 
     testUtil.verifyRelPlan("SELECT * FROM MyTable AS T JOIN LookupTable "
       + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 64addab..82d8f31 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -18,11 +18,6 @@
 
 package org.apache.flink.table.planner.plan.stream.sql.join
 
-import _root_.java.lang.{Boolean => JBoolean}
-import _root_.java.sql.Timestamp
-import _root_.java.util
-import _root_.java.util.{ArrayList => JArrayList, Collection => JCollection, 
HashMap => JHashMap, List => JList, Map => JMap}
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
@@ -40,11 +35,17 @@ import org.apache.flink.table.planner.utils.TableTestBase
 import org.apache.flink.table.sources._
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.utils.EncodingUtils
+
 import org.junit.Assert.{assertThat, assertTrue, fail}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Assume, Before, Test}
 
+import _root_.java.lang.{Boolean => JBoolean}
+import _root_.java.sql.Timestamp
+import _root_.java.util
+import _root_.java.util.{ArrayList => JArrayList, Collection => JCollection, 
HashMap => JHashMap, List => JList, Map => JMap}
+
 import _root_.scala.collection.JavaConversions._
 
 /**
@@ -257,7 +258,9 @@ class LookupJoinTest(legacyTableSource: Boolean) extends 
TableTestBase with Seri
   def testJoinOnDifferentKeyTypes(): Unit = {
     // Will do implicit type coercion.
     thrown.expect(classOf[TableException])
-    thrown.expectMessage("VARCHAR(2147483647) and INTEGER does not have common 
type now")
+    thrown.expectMessage(
+      "implicit type conversion between VARCHAR(2147483647) and INTEGER " +
+        "is not supported on join's condition now")
     util.verifyExecPlan("SELECT * FROM MyTable AS T JOIN LookupTable "
       + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id")
   }

Reply via email to