Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite 7233c241d -> 1c53c873a


[FLINK-3226] Casting support for arithmetic operators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d765d08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d765d08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d765d08

Branch: refs/heads/tableOnCalcite
Commit: 9d765d0833cabfa6edd4097668b6ea7bd182ad76
Parents: 7233c24
Author: twalthr <twal...@apache.org>
Authored: Sat Feb 13 12:38:12 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Feb 16 10:02:24 2016 +0100

----------------------------------------------------------------------
 .../api/table/codegen/OperatorCodeGen.scala     | 38 ++++++++++++--
 .../api/java/table/test/CastingITCase.java      | 55 ++++++++++----------
 .../api/scala/table/test/CastingITCase.scala    | 47 +++++++++--------
 3 files changed, 87 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d765d08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
index 8402569..0f8083e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
@@ -18,20 +18,52 @@
 package org.apache.flink.api.table.codegen
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 
 object OperatorCodeGen {
 
-   def generateArithmeticOperator(
+  def generateArithmeticOperator(
       operator: String,
       nullCheck: Boolean,
       resultType: TypeInformation[_],
       left: GeneratedExpression,
       right: GeneratedExpression)
     : GeneratedExpression = {
-    generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+    // String arithmetic // TODO rework
+    if (isString(left)) {
+      generateOperatorIfNotNull(nullCheck, resultType, left, right) {
       (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+      }
+    }
+    // Numeric arithmetic
+    else if (isNumeric(left) && isNumeric(right)) {
+      val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]]
+      val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]]
+      val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+
+      generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+      (leftTerm, rightTerm) =>
+        // no casting required
+        if (leftType == resultType && rightType == resultType) {
+          s"$leftTerm $operator $rightTerm"
+        }
+        // left needs casting
+        else if (leftType != resultType && rightType == resultType) {
+          s"(($resultTypeTerm) $leftTerm) $operator $rightTerm"
+        }
+        // right needs casting
+        else if (leftType == resultType && rightType != resultType) {
+          s"$leftTerm $operator (($resultTypeTerm) $rightTerm)"
+        }
+        // both sides need casting
+        else {
+          s"(($resultTypeTerm) $leftTerm) $operator (($resultTypeTerm) 
$rightTerm)"
+        }
+      }
+    }
+    else {
+      throw new CodeGenException("Unsupported arithmetic operation.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d765d08/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index 957c093..e5b5f58 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -18,77 +18,76 @@
 
 package org.apache.flink.api.java.table.test;
 
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.codegen.CodeGenException;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Ignore;
+import org.apache.flink.api.table.test.TableProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import scala.NotImplementedError;
-
-import java.util.List;
-
 @RunWith(Parameterized.class)
-public class CastingITCase extends MultipleProgramsTestBase {
+public class CastingITCase extends TableProgramsTestBase {
 
-       public CastingITCase(TestExecutionMode mode){
-               super(mode);
+       public CastingITCase(TestExecutionMode mode, TableConfigMode 
configMode){
+               super(mode, configMode);
        }
 
-       @Ignore
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testNumericAutocastInArithmetic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"));
+               DataSource<Tuple8<Byte, Short, Integer, Long, Float, Double, 
Long, Double>> input =
+                               env.fromElements(new Tuple8<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1));
 
                Table table =
                                tableEnv.fromDataSet(input);
 
                Table result = table.select("f0 + 1, f1 +" +
-                               " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
+                               " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 
+ 1.0d, f7 + f0");
 
                DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
                List<Row> results = ds.collect();
-               String expected = "2,2,2,2.0,2.0,2.0";
+               String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1";
                compareResultAsText(results, expected);
        }
 
        @Test
        public void testNumericAutocastInComparison() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+               DataSource<Tuple6<Byte, Short, Integer, Long, Float, Double>> 
input =
                                env.fromElements(
-                                               new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"),
-                                               new Tuple7<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d, "Hello"));
+                                               new Tuple6<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d),
+                                               new Tuple6<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d));
 
                Table table =
-                               tableEnv.fromDataSet(input, "a,b,c,d,e,f,g");
+                               tableEnv.fromDataSet(input, "a,b,c,d,e,f");
 
                Table result = table
                                .filter("a > 1 && b > 1 && c > 1L && d > 1.0f 
&& e > 1.0d && f > 1");
 
                DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
                List<Row> results = ds.collect();
-               String expected = "2,2,2,2,2.0,2.0,Hello";
+               String expected = "2,2,2,2,2.0,2.0";
                compareResultAsText(results, expected);
        }
 
+       // TODO support advanced String operations
+
        @Test(expected = CodeGenException.class)
        public void testCastFromString() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/9d765d08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index d6a853d..6121cb6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -19,45 +19,33 @@
 package org.apache.flink.api.scala.table.test
 
 import java.util.Date
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
+
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.Row
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.api.table.codegen.CodeGenException
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
 import scala.collection.JavaConverters._
-import org.apache.flink.api.table.codegen.CodeGenException
 
 @RunWith(classOf[Parameterized])
 class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
 
-  @Ignore // String autocasting not yet supported
   @Test
-  def testAutoCastToString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new 
Date(0))).toTable
-      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + 
"d", '_7 + "Date")
-
-    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Ignore // gives different types of exceptions for cluster and collection 
modes
-  @Test(expected = classOf[NotImplementedError])
   def testNumericAutoCastInArithmetic(): Unit = {
 
     // don't test everything, just some common cast directions
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
-      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
+    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, 1L, 
1001.1)).toTable
+      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1, '_7 
+ 1.0d, '_8 + '_1)
 
-    val expected = "2,2,2,2.0,2.0,2.0"
+    val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -78,6 +66,21 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  // TODO support advanced String operations
+
+  @Ignore
+  @Test
+  def testAutoCastToString(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new 
Date(0))).toTable
+      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + 
"d", '_7 + "Date")
+
+    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
   @Test(expected = classOf[CodeGenException])
   def testCastFromString: Unit = {
 

Reply via email to