[FLINK-3631] [tableAPI] Check type compatibility for comparison expressions in 
CodeGenerator

This closes #1823


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22d327f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22d327f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22d327f0

Branch: refs/heads/master
Commit: 22d327f0409665c64588a4cb81acef97b481fb37
Parents: 7785288
Author: ramkrishna <[email protected]>
Authored: Mon Mar 21 22:03:42 2016 +0530
Committer: Fabian Hueske <[email protected]>
Committed: Tue Mar 22 13:53:58 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/codegen/CodeGenerator.scala | 14 ++++++++
 .../table/test/StringExpressionsITCase.java     | 36 ++++++++++++++++++++
 2 files changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22d327f0/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index c7d0ed9..f213d4c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -652,31 +652,37 @@ class CodeGenerator(
       case EQUALS =>
         val left = operands.head
         val right = operands(1)
+        checkNumericOrString(left, right)
         generateEquals(nullCheck, left, right)
 
       case NOT_EQUALS =>
         val left = operands.head
         val right = operands(1)
+        checkNumericOrString(left, right)
         generateNotEquals(nullCheck, left, right)
 
       case GREATER_THAN =>
         val left = operands.head
         val right = operands(1)
+        checkNumericOrString(left, right)
         generateComparison(">", nullCheck, left, right)
 
       case GREATER_THAN_OR_EQUAL =>
         val left = operands.head
         val right = operands(1)
+        checkNumericOrString(left, right)
         generateComparison(">=", nullCheck, left, right)
 
       case LESS_THAN =>
         val left = operands.head
         val right = operands(1)
+        checkNumericOrString(left, right)
         generateComparison("<", nullCheck, left, right)
 
       case LESS_THAN_OR_EQUAL =>
         val left = operands.head
         val right = operands(1)
+        checkNumericOrString(left, right)
         generateComparison("<=", nullCheck, left, right)
 
       case IS_NULL =>
@@ -736,6 +742,14 @@ class CodeGenerator(
     }
   }
 
+  def checkNumericOrString(left: GeneratedExpression, right: 
GeneratedExpression): Unit = {
+    if (isNumeric(left)) {
+      requireNumeric(right)
+    } else if (isString(left)) {
+      requireString(right)
+    }
+  }
+
   override def visitOver(over: RexOver): GeneratedExpression = ???
 
   // 
----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/22d327f0/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index d95a5f6..65f0470 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.api.java.table.test;
 
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.codegen.CodeGenException;
@@ -117,4 +120,37 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
                resultSet.collect();
        }
+
+       @Test(expected = CodeGenException.class)
+       public void testGeneratedCodeForStringComparison() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> tupleDataSet = 
CollectionDataSets.get3TupleDataSet(env);
+               Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+               // Must fail because the comparison here is between 
Integer(column 'a') and (String 'Fred')
+               Table res = in.filter("a = 'Fred'" );
+               DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+       }
+
+       @Test(expected = CodeGenException.class)
+       public void testGeneratedCodeForIntegerEqualsComparison() throws 
Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> tupleDataSet = 
CollectionDataSets.get3TupleDataSet(env);
+               Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+               // Must fail because the comparison here is between 
String(column 'c') and (Integer 10)
+               Table res = in.filter("c = 10" );
+               DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+       }
+
+       @Test(expected = CodeGenException.class)
+       public void testGeneratedCodeForIntegerGreaterComparison() throws 
Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> tupleDataSet = 
CollectionDataSets.get3TupleDataSet(env);
+               Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+               // Must fail because the comparison here is between 
String(column 'c') and (Integer 10)
+               Table res = in.filter("c > 10" );
+               DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+       }
 }

Reply via email to