Repository: flink
Updated Branches:
  refs/heads/master 31f63178d -> 76a40d59e


[FLINK-1851] [tableAPI] Add support for casting in Table Expression Parser
- Also fix code generation for casting between primitives.
- Extends documentation for TableAPI expressions

This closes #592


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7bea9015
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7bea9015
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7bea9015

Branch: refs/heads/master
Commit: 7bea90150309156536558d94cb5c244369af81b9
Parents: 31f6317
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Thu Apr 9 11:36:16 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Sep 16 16:15:58 2015 +0200

----------------------------------------------------------------------
 docs/libs/table.md                              | 55 +++++++++++++++-
 .../api/common/typeinfo/BasicTypeInfo.java      |  2 +-
 .../org/apache/flink/api/table/Table.scala      |  4 ++
 .../table/codegen/ExpressionCodeGenerator.scala | 69 +++++++++++++++++---
 .../expressions/analysis/InsertAutoCasts.scala  | 12 ++--
 .../flink/api/table/expressions/cast.scala      | 14 +++-
 .../api/table/parser/ExpressionParser.scala     | 18 ++++-
 .../api/java/table/test/CastingITCase.java      | 23 +++++++
 .../api/scala/table/test/CastingITCase.scala    | 29 ++++++--
 9 files changed, 198 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/docs/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/libs/table.md b/docs/libs/table.md
index ac78f85..b59450e 100644
--- a/docs/libs/table.md
+++ b/docs/libs/table.md
@@ -123,7 +123,58 @@ DataSet<WC> result = tableEnv.toDataSet(filtered, 
WC.class);
 When using Java, the embedded DSL for specifying expressions cannot be used. 
Only String expressions
 are supported. They support exactly the same feature set as the expression DSL.
 
-Please refer to the Javadoc for a full list of supported operations and a 
description of the
-expression syntax. 
+## Expression Syntax
 
+A `Table` supports to following operations: `select`, `where`, `groupBy`, 
`join` (Plus `filter` as
+an alias for `where`.). These are also documented in the 
[Javadoc](http://flink.apache.org/docs/latest/api/java/org/apache/flink/api/table/Table.html)
 
+of Table.
+
+Some of these expect an expression. These can either be specified using an 
embedded Scala DSL or
+a String expression. Please refer to the examples above to learn how 
expressions can be
+formulated.
+
+This is the complete EBNF grammar for expressions:
+
+{% highlight ebnf %}
+
+expression = single expression , { "," , single expression } ;
+
+single expression = alias | logic ;
+
+alias = logic | logic , "AS" , field reference ;
+
+logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
+
+comparison = term , [ ( "=" | "!=" | ">" | ">=" | "<" | "<=" ) , term ] ;
+
+term = product , [ ( "+" | "-" ) , product ] ;
+
+product = binary bitwise , [ ( "*" | "/" | "%" ) , binary bitwise ] ;
+
+binary bitwise = unary , [ ( "&" | "!" | "^" ) , unary ] ;
+
+unary = [ "!" | "-" | "~" ] , suffix ;
+
+suffix = atom | aggregation | cast | as | substring ;
+
+aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | "avg" ] ;
+
+cast = atom , ".cast(" , data type , ")" ;
+
+data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | 
"BOOLEAN" | "STRING" ;
+
+as = atom , ".as(" , field reference , ")" ;
+
+substring = atom , ".substring(" , substring start , ["," substring end] , ")" 
;
+
+substring start = single expression ;
+
+substring end = single expression ;
+
+atom = ( "(" , single expression , ")" ) | literal | field reference ;
+
+{% endhighlight %}
+
+Here, `literal` is a valid Java literal and `field reference` specifies a 
column in the data. The
+column names follow Java identifier syntax.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index 07ca478..c622151 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -91,7 +91,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
         * Returns whether this type should be automatically casted to
         * the target type in an arithmetic operation.
         */
-       public boolean canCastTo(BasicTypeInfo<?> to) {
+       public boolean shouldAutocastTo(BasicTypeInfo<?> to) {
                for (Class<?> possibleTo: possibleCastTargetTypes) {
                        if (possibleTo.equals(to.getTypeClass())) {
                                return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
index fdb125b..effc325 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -41,6 +41,10 @@ import org.apache.flink.api.table.plan._
  *   val table2 = ...
  *   val set = table2.toDataSet[MyType]
  * }}}
+ *
+ * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either 
take arguments
+ * in a Scala DSL or as an expression String. Please refer to the 
documentation for the expression
+ * syntax.
  */
 case class Table(private[flink] val operation: PlanNode) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
index e109574..43396d9 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
@@ -249,7 +249,7 @@ abstract class ExpressionCodeGenerator[R](
           s"""
             |boolean $nullTerm = ${childGen.nullTerm};
             |$resultTpe $resultTerm;
-            |if ($nullTerm == null) {
+            |if ($nullTerm) {
             |  $resultTerm = null;
             |} else {
             |  $resultTerm = "" + ${childGen.resultTerm};
@@ -262,8 +262,12 @@ abstract class ExpressionCodeGenerator[R](
         }
         childGen.code + castCode
 
-      case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_]) =>
+      case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_])
+        if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO =>
         val childGen = generateExpression(child)
+        val fromTpe = typeTermForTypeInfoForCast(child.typeInfo)
+        val toTpe = typeTermForTypeInfoForCast(tpe)
+
         val castCode = if (nullCheck) {
           s"""
             |boolean $nullTerm = ${childGen.nullTerm};
@@ -276,6 +280,27 @@ abstract class ExpressionCodeGenerator[R](
             |  
${tpe.getTypeClass.getCanonicalName}.valueOf(${childGen.resultTerm});
           """.stripMargin
         }
+
+        childGen.code + castCode
+
+      case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_])
+          if child.typeInfo.isBasicType =>
+        val childGen = generateExpression(child)
+        val fromTpe = typeTermForTypeInfoForCast(child.typeInfo)
+        val toTpe = typeTermForTypeInfoForCast(tpe)
+        val castCode = if (nullCheck) {
+          s"""
+            |boolean $nullTerm = ${childGen.nullTerm};
+            |$resultTpe $resultTerm;
+            |if ($nullTerm) {
+            |  $resultTerm = null;
+            |} else {
+            |  $resultTerm = ($toTpe)($fromTpe) ${childGen.resultTerm};
+            |}
+          """.stripMargin
+        } else {
+          s"$resultTpe $resultTerm = ($toTpe)($fromTpe) 
${childGen.resultTerm};\n"
+        }
         childGen.code + castCode
 
       case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) =>
@@ -589,14 +614,38 @@ abstract class ExpressionCodeGenerator[R](
 
   protected def typeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe 
match {
 
-//    case BasicTypeInfo.INT_TYPE_INFO => "int"
-//    case BasicTypeInfo.LONG_TYPE_INFO => "long"
-//    case BasicTypeInfo.SHORT_TYPE_INFO => "short"
-//    case BasicTypeInfo.BYTE_TYPE_INFO => "byte"
-//    case BasicTypeInfo.FLOAT_TYPE_INFO => "float"
-//    case BasicTypeInfo.DOUBLE_TYPE_INFO => "double"
-//    case BasicTypeInfo.BOOLEAN_TYPE_INFO => "boolean"
-//    case BasicTypeInfo.CHAR_TYPE_INFO => "char"
+    // From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
+    // does not seem to like this, so we manually give the correct type here.
+    case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+    case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+    case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+    case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+    case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+    case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => 
"boolean[]"
+    case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+    case _ =>
+      tpe.getTypeClass.getCanonicalName
+
+  }
+
+  // when casting we first need to unbox Primitives, for example,
+  // float a = 1.0f;
+  // byte b = (byte) a;
+  // works, but for boxed types we need this:
+  // Float a = 1.0f;
+  // Byte b = (byte)(float) a;
+  protected def typeTermForTypeInfoForCast(tpe: TypeInformation[_]): String = 
tpe match {
+
+    case BasicTypeInfo.INT_TYPE_INFO => "int"
+    case BasicTypeInfo.LONG_TYPE_INFO => "long"
+    case BasicTypeInfo.SHORT_TYPE_INFO => "short"
+    case BasicTypeInfo.BYTE_TYPE_INFO => "byte"
+    case BasicTypeInfo.FLOAT_TYPE_INFO => "float"
+    case BasicTypeInfo.DOUBLE_TYPE_INFO => "double"
+    case BasicTypeInfo.BOOLEAN_TYPE_INFO => "boolean"
+    case BasicTypeInfo.CHAR_TYPE_INFO => "char"
 
     // From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
     // does not seem to like this, so we manually give the correct type here.

http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
index af8de38..0fdcab6 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
@@ -33,10 +33,10 @@ class InsertAutoCasts extends Rule[Expression] {
       case plus@Plus(o1, o2) =>
         // Plus is special case since we can cast anything to String for 
String concat
         if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && 
o2.typeInfo.isBasicType) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
             o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
             Plus(Cast(o1, o2.typeInfo), o2)
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+          } else if 
(o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
             o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
             Plus(o1, Cast(o2, o1.typeInfo))
           } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
@@ -55,10 +55,10 @@ class InsertAutoCasts extends Rule[Expression] {
         val o1 = ba.left
         val o2 = ba.right
         if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && 
o2.typeInfo.isBasicType) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
             o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
             ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+          } else if 
(o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
             o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
             ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
           } else {
@@ -73,10 +73,10 @@ class InsertAutoCasts extends Rule[Expression] {
         val o2 = ba.right
         if (o1.typeInfo != o2.typeInfo && 
o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
           o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
             o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
             ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+          } else if 
(o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
             o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
             ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
           } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
index 31dfdb6..9fae862 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -17,8 +17,18 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.table.ExpressionException
 
 case class Cast(child: Expression, tpe: TypeInformation[_]) extends 
UnaryExpression {
-  def typeInfo = tpe
+  def typeInfo = tpe match {
+    case BasicTypeInfo.STRING_TYPE_INFO => tpe
+
+    case b if b.isBasicType && child.typeInfo.isBasicType => tpe
+
+    case _ => throw new ExpressionException(
+      s"Invalid cast: $this. Casts are only valid betwixt primitive types.")
+  }
+
+  override def toString = s"$child.cast($tpe)"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index 7bad7fe..075f070 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.table.parser
 
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.table.ExpressionException
 import org.apache.flink.api.table.plan.As
 import org.apache.flink.api.table.expressions._
@@ -27,8 +28,8 @@ import scala.util.parsing.combinator.{PackratParsers, 
JavaTokenParsers}
  * Parser for expressions inside a String. This parses exactly the same 
expressions that
  * would be accepted by the Scala Expression DSL.
  *
- * See 
[[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] for 
the constructs
+ * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the 
constructs
  * available in the Scala Expression DSL. This parser must be kept in sync 
with the Scala DSL
  * lazy valined in the above files.
  */
@@ -107,6 +108,17 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val avg: PackratParser[Expression] =
     (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => 
Avg(e) })
 
+  lazy val cast: PackratParser[Expression] =
+    atom <~ ".cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } |
+    atom <~ ".cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } |
+    atom <~ ".cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } |
+    atom <~ ".cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } |
+    atom <~ ".cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } |
+    atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) 
} |
+    atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } 
|
+    atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, 
BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
+    atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) }
+
   lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
     case e ~ _ ~ as ~ _ => Naming(e, as.name)
   }
@@ -125,7 +137,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val suffix =
     isNull | isNotNull |
-      abs | sum | min | max | count | avg |
+      abs | sum | min | max | count | avg | cast |
       substring | substringWithoutEnd | atom
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index 8b60ed1..0636dfa 100644
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.table.test;
 
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.java.DataSet;
@@ -128,5 +129,27 @@ public class CastingITCase extends 
MultipleProgramsTestBase {
 
                expected = "2,2,2,2,2.0,2.0,Hello";
        }
+
+       @Test
+       public void testCastFromString() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple3<String, String, String>> input =
+                               env.fromElements(new Tuple3<String, String, 
String>("1", "true", "2.0"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result = table.select(
+                               "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), 
f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1,1,1,1,2.0,2.0,true\n";
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bea9015/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 736cf68..524d75a 100644
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -18,15 +18,17 @@
 
 package org.apache.flink.api.scala.table.test
 
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
 
 @RunWith(classOf[Parameterized])
 class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
@@ -89,4 +91,23 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     expected = "2,2,2,2,2.0,2.0"
   }
 
+  @Test
+  def testCastFromString: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("1", "true", "2.0")).toTable
+      .select(
+        '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO),
+        '_1.cast(BasicTypeInfo.SHORT_TYPE_INFO),
+        '_1.cast(BasicTypeInfo.INT_TYPE_INFO),
+        '_1.cast(BasicTypeInfo.LONG_TYPE_INFO),
+        '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
+        '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
+        '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,1,1,2.0,2.0,true\n"
+  }
+
 }

Reply via email to