[FLINK-5084] [table] Replace Java Table API integration tests by unit tests

This closes #2977.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/649cf054
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/649cf054
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/649cf054

Branch: refs/heads/master
Commit: 649cf054e692ea7ffba3125377300fa0496908e7
Parents: 614acc3
Author: mtunique <[email protected]>
Authored: Fri Dec 9 13:23:36 2016 +0800
Committer: twalthr <[email protected]>
Committed: Tue Jan 10 11:54:23 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/java/batch/ExplainTest.java | 160 --------
 .../java/batch/table/AggregationsITCase.java    | 380 ------------------
 .../table/api/java/batch/table/CalcITCase.java  | 324 ----------------
 .../api/java/batch/table/CastingITCase.java     | 140 -------
 .../table/api/java/batch/table/JoinITCase.java  | 207 ----------
 .../table/api/scala/batch/sql/CalcITCase.scala  |   2 +-
 .../scala/batch/table/AggregationsITCase.scala  |  74 +---
 .../api/scala/batch/table/CalcITCase.scala      | 101 ++---
 .../api/scala/batch/table/CastingITCase.scala   | 104 +++++
 .../api/scala/batch/table/JoinITCase.scala      | 113 +-----
 .../scala/batch/table/SetOperatorsITCase.scala  |  92 +----
 .../api/scala/batch/table/SortITCase.scala      |  11 -
 .../AggregationsStringExpressionTest.scala      | 342 ++++++++++++++++
 .../stringexpr/CalcStringExpressionTest.scala   | 386 +++++++++++++++++++
 .../CastingStringExpressionTest.scala           | 121 ++++++
 .../stringexpr/JoinStringExpressionTest.scala   | 276 +++++++++++++
 .../validation/AggregationsValidationTest.scala | 139 +++++++
 .../table/validation/CalcValidationTest.scala   | 138 +++++++
 .../table/validation/JoinValidationTest.scala   | 188 +++++++++
 .../validation/SetOperatorsValidationTest.scala | 119 ++++++
 .../table/validation/SortValidationTest.scala   |  47 +++
 .../batch/utils/LogicalPlanFormatUtils.scala    |  35 ++
 22 files changed, 1931 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
deleted file mode 100644
index 114579f..0000000
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-
-public class ExplainTest extends MultipleProgramsTestBase {
-
-       public ExplainTest() {
-               super(TestExecutionMode.CLUSTER);
-       }
-
-       private static String testFilePath = 
ExplainTest.class.getResource("/").getFile();
-
-       @Test
-       public void testFilterWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple2<Integer, String>> input = env.fromElements(new 
Tuple2<>(1,"d"));
-               Table table = tableEnv
-                       .fromDataSet(input, "a, b")
-                       .filter("a % 2 = 0");
-
-               String result = tableEnv.explain(table).replaceAll("\\r\\n", 
"\n");
-               try (Scanner scanner = new Scanner(new File(testFilePath +
-                       "../../src/test/scala/resources/testFilter0.out"))){
-                       String source = 
scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-                       assertEquals(source, result);
-               }
-       }
-
-       @Test
-       public void testFilterWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple2<Integer, String>> input = env.fromElements(new 
Tuple2<>(1,"d"));
-               Table table = tableEnv
-                       .fromDataSet(input, "a, b")
-                       .filter("a % 2 = 0");
-
-               String result = tableEnv.explain(table, 
true).replaceAll("\\r\\n", "\n");
-               try (Scanner scanner = new Scanner(new File(testFilePath +
-                       "../../src/test/scala/resources/testFilter1.out"))){
-                       String source = 
scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-                       assertEquals(source, result);
-               }
-       }
-
-       @Test
-       public void testJoinWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new 
Tuple2<>(1,"d"));
-               DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new 
Tuple2<>(1,"d"));
-               Table table1 = tableEnv.fromDataSet(input1, "a, b");
-               Table table2 = tableEnv.fromDataSet(input2, "c, d");
-               Table table = table1
-                       .join(table2)
-                       .where("b = d")
-                       .select("a, c");
-
-               String result = tableEnv.explain(table).replaceAll("\\r\\n", 
"\n");
-               try (Scanner scanner = new Scanner(new File(testFilePath +
-                       "../../src/test/scala/resources/testJoin0.out"))){
-                       String source = 
scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-                       assertEquals(source, result);
-               }
-       }
-
-       @Test
-       public void testJoinWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new 
Tuple2<>(1,"d"));
-               DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new 
Tuple2<>(1,"d"));
-               Table table1 = tableEnv.fromDataSet(input1, "a, b");
-               Table table2 = tableEnv.fromDataSet(input2, "c, d");
-               Table table = table1
-                       .join(table2)
-                       .where("b = d")
-                       .select("a, c");
-
-               String result = tableEnv.explain(table, 
true).replaceAll("\\r\\n", "\n");
-               try (Scanner scanner = new Scanner(new File(testFilePath +
-                       "../../src/test/scala/resources/testJoin1.out"))){
-                       String source = 
scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-                       assertEquals(source, result);
-               }
-       }
-
-       @Test
-       public void testUnionWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new 
Tuple2<>(1,"d"));
-               DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new 
Tuple2<>(1,"d"));
-               Table table1 = tableEnv.fromDataSet(input1, "count, word");
-               Table table2 = tableEnv.fromDataSet(input2, "count, word");
-               Table table = table1.unionAll(table2);
-
-               String result = tableEnv.explain(table).replaceAll("\\r\\n", 
"\n");
-               try (Scanner scanner = new Scanner(new File(testFilePath +
-                       "../../src/test/scala/resources/testUnion0.out"))){
-                       String source = 
scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-                       assertEquals(source, result);
-               }
-       }
-
-       @Test
-       public void testUnionWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new 
Tuple2<>(1,"d"));
-               DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new 
Tuple2<>(1,"d"));
-               Table table1 = tableEnv.fromDataSet(input1, "count, word");
-               Table table2 = tableEnv.fromDataSet(input2, "count, word");
-               Table table = table1.unionAll(table2);
-
-               String result = tableEnv.explain(table, 
true).replaceAll("\\r\\n", "\n");
-               try (Scanner scanner = new Scanner(new File(testFilePath +
-                       "../../src/test/scala/resources/testUnion1.out"))){
-                       String source = 
scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-                       assertEquals(source, result);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
deleted file mode 100644
index d37ebb5..0000000
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.java.batch.table;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.examples.java.WordCountTable.WC;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class AggregationsITCase extends TableProgramsTestBase {
-
-       public AggregationsITCase(TestExecutionMode mode, TableConfigMode 
configMode){
-               super(mode, configMode);
-       }
-
-       @Test
-       public void testAggregationTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               Table table = 
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-               Table result = table.select("f0.sum, f0.min, f0.max, f0.count, 
f0.avg");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "231,1,21,21,11";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testAggregationOnNonExistingField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-               Table result =
-                               table.select("foo.avg");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testWorkingAggregationDataTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(
-                                               new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"),
-                                               new Tuple7<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d, "Ciao"));
-
-               Table table = tableEnv.fromDataSet(input);
-
-               Table result =
-                               table.select("f0.avg, f1.avg, f2.avg, f3.avg, 
f4.avg, f5.avg, f6.count");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1,1,1,1.5,1.5,2";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testAggregationWithArithmetic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple2<Float, String>> input =
-                               env.fromElements(
-                                               new Tuple2<>(1f, "Hello"),
-                                               new Tuple2<>(2f, "Ciao"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result =
-                               table.select("(f0 + 2).avg + 2, f1.count + 5");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "5.5,7";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testAggregationWithTwoCount() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple2<Float, String>> input =
-                       env.fromElements(
-                               new Tuple2<>(1f, "Hello"),
-                               new Tuple2<>(2f, "Ciao"));
-
-               Table table =
-                       tableEnv.fromDataSet(input);
-
-               Table result =
-                       table.select("f0.count, f1.count");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testNonWorkingDataTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<>(1f, "Hello"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result =
-                               // Must fail. Cannot compute SUM aggregate on 
String field.
-                               table.select("f1.sum");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testNoNestedAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<>(1f, "Hello"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result =
-                               // Must fail. Aggregation on aggregation not 
allowed.
-                               table.select("f0.sum.sum");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testGroupingOnNonExistentField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               tableEnv
-                       .fromDataSet(input, "a, b, c")
-                       // must fail. Field foo is not in input
-                       .groupBy("foo")
-                       .select("a.avg");
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testGroupingInvalidSelection() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               tableEnv
-                       .fromDataSet(input, "a, b, c")
-                       .groupBy("a, b")
-                       // must fail. Field c is not a grouping key or 
aggregation
-                       .select("c");
-       }
-
-       @Test
-       public void testGroupedAggregate() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .groupBy("b").select("b, a.sum");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + 
"5,65\n" + "6,111\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testGroupingKeyForwardIfNotUsed() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .groupBy("b").select("a.sum");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + 
"111\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testGroupNoAggregation() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                       .groupBy("b").select("a.sum as d, b").groupBy("b, 
d").select("b");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
-               List<Row> results = ds.collect();
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testPojoAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-               DataSet<WC> input = env.fromElements(
-                               new WC("Hello", 1),
-                               new WC("Ciao", 1),
-                               new WC("Hello", 1),
-                               new WC("Hola", 1),
-                               new WC("Hola", 1));
-
-               Table table = tableEnv.fromDataSet(input);
-
-               Table filtered = table
-                               .groupBy("word")
-                               .select("word.count as frequency, word")
-                               .filter("frequency = 2");
-
-               List<String> result = tableEnv.toDataSet(filtered, WC.class)
-                               .map(new MapFunction<WC, String>() {
-                                       public String map(WC value) throws 
Exception {
-                                               return value.word;
-                                       }
-                               }).collect();
-               String expected = "Hello\n" + "Hola";
-               compareResultAsText(result, expected);
-       }
-
-       @Test
-       public void testPojoGrouping() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple3<String, Double, String>> data = env.fromElements(
-                       new Tuple3<>("A", 23.0, "Z"),
-                       new Tuple3<>("A", 24.0, "Y"),
-                       new Tuple3<>("B", 1.0, "Z"));
-
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               Table table = tableEnv
-                       .fromDataSet(data, "groupMe, value, name")
-                       .select("groupMe, value, name")
-                       .where("groupMe != 'B'");
-
-               DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, 
MyPojo.class);
-
-               DataSet<MyPojo> result = myPojos.groupBy("groupMe")
-                       .sortGroup("value", Order.DESCENDING)
-                       .first(1);
-
-               List<MyPojo> resultList = result.collect();
-               compareResultAsText(resultList, "A,24.0,Y");
-       }
-
-       @Test
-       public void testDistinct() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table distinct = table.select("b").distinct();
-
-               DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testDistinctAfterAggregate() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
-
-               Table distinct = table.groupBy("a, e").select("e").distinct();
-
-               DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1\n" + "2\n" + "3\n";
-               compareResultAsText(results, expected);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public static class MyPojo implements Serializable {
-               private static final long serialVersionUID = 
8741918940120107213L;
-
-               public String groupMe;
-               public double value;
-               public String name;
-
-               public MyPojo() {
-                       // for serialization
-               }
-
-               public MyPojo(String groupMe, double value, String name) {
-                       this.groupMe = groupMe;
-                       this.value = value;
-                       this.name = name;
-               }
-
-               @Override
-               public String toString() {
-                       return groupMe + "," + value + "," + name;
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
deleted file mode 100644
index b1ef563..0000000
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.Arrays;
-import java.util.Collection;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class CalcITCase extends TableProgramsTestBase {
-
-       public CalcITCase(TestExecutionMode mode, TableConfigMode configMode){
-               super(mode, configMode);
-       }
-
-       @Parameterized.Parameters(name = "Execution mode = {0}, Table config = 
{1}")
-       public static Collection<Object[]> parameters() {
-               return Arrays.asList(new Object[][] {
-                       { TestExecutionMode.COLLECTION, 
TableProgramsTestBase.DEFAULT() },
-                       { TestExecutionMode.COLLECTION, 
TableProgramsTestBase.NO_NULL() }
-               });
-       }
-
-       @Test
-       public void testSimpleSelectAllWithAs() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-               Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-               Table result = in
-                               .select("a, b, c");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" +
-                       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" 
+ "6,3,Luke Skywalker\n" +
-                       "7,4,Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" +
-                       "11,5,Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" +
-                       "14,5,Comment#8\n" + "15,5,Comment#9\n" + 
"16,6,Comment#10\n" +
-                       "17,6,Comment#11\n" + "18,6,Comment#12\n" + 
"19,6,Comment#13\n" +
-                       "20,6,Comment#14\n" + "21,6,Comment#15\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testSimpleSelectWithNaming() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-               Table in = tableEnv.fromDataSet(ds);
-
-               Table result = in
-                               .select("f0 as a, f1 as b")
-                               .select("a, b");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + 
"5,3\n" + "6,3\n" + "7,4\n" +
-                               "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + 
"12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-                               "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + 
"20,6\n" + "21,6\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testSimpleSelectRenameAll() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-               Table in = tableEnv.fromDataSet(ds);
-
-               Table result = in
-                       .select("f0 as a, f1 as b, f2 as c")
-                       .select("a, b");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + 
"5,3\n" + "6,3\n" + "7,4\n" +
-                       "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + 
"13,5\n" + "14,5\n" + "15,5\n" +
-                       "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + 
"21,6\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testSelectInvalidField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               tableEnv.fromDataSet(ds, "a, b, c")
-                       // Must fail. Field foo does not exist
-                       .select("a + 1, foo + 2");
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testSelectAmbiguousFieldNames() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               tableEnv.fromDataSet(ds, "a, b, c")
-                       // Must fail. Field foo does not exist
-                       .select("a + 1 as foo, b + 2 as foo");
-       }
-
-       @Test
-       public void testSelectStar() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-               Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-               Table result = in
-                       .select("*");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" +
-                                 "4,3,Hello world, how are you?\n" + "5,3,I am 
fine.\n" + "6,3,Luke Skywalker\n" +
-                                 "7,4,Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" +
-                                 "11,5,Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" +
-                                 "14,5,Comment#8\n" + "15,5,Comment#9\n" + 
"16,6,Comment#10\n" +
-                                 "17,6,Comment#11\n" + "18,6,Comment#12\n" + 
"19,6,Comment#13\n" +
-                                 "20,6,Comment#14\n" + "21,6,Comment#15\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testAllRejectingFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .filter("false");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testAllPassingFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .filter("true");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" +
-                       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" 
+ "6,3,Luke Skywalker\n" +
-                       "7,4,Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" +
-                       "11,5,Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" +
-                       "14,5,Comment#8\n" + "15,5,Comment#9\n" + 
"16,6,Comment#10\n" +
-                       "17,6,Comment#11\n" + "18,6,Comment#12\n" + 
"19,6,Comment#13\n" +
-                       "20,6,Comment#14\n" + "21,6,Comment#15\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testFilterOnIntegerTupleField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .filter(" a % 2 = 0 ");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testNotEquals() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .filter("!( a % 2 <> 0 ) ");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testDisjunctivePreds() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                       .filter("a < 2 || a > 20");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testIntegerBiggerThan128() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
env.fromElements(new Tuple3<>(300, 1L, "Hello"));
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                       .filter("a = 300 ");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "300,1,Hello\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testFilterInvalidField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               table
-                       // Must fail. Field foo does not exist.
-                       .filter("foo = 17");
-       }
-
-       public static class OldHashCode extends ScalarFunction {
-               public int eval(String s) {
-                       return -1;
-               }
-       }
-
-       public static class HashCode extends ScalarFunction {
-               public int eval(String s) {
-                       return s.hashCode();
-               }
-       }
-
-       @Test
-       public void testUserDefinedScalarFunction() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               tableEnv.registerFunction("hashCode", new OldHashCode());
-               tableEnv.registerFunction("hashCode", new HashCode());
-
-               DataSource<String> input = env.fromElements("a", "b", "c");
-
-               Table table = tableEnv.fromDataSet(input, "text");
-
-               Table result = table.select("text.hashCode()");
-
-               DataSet<Integer> ds = tableEnv.toDataSet(result, Integer.class);
-               List<Integer> results = ds.collect();
-               String expected = "97\n98\n99";
-               compareResultAsText(results, expected);
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
deleted file mode 100644
index b1bb6e8..0000000
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class CastingITCase extends TableProgramsTestBase {
-
-       public CastingITCase(TestExecutionMode mode, TableConfigMode 
configMode){
-               super(mode, configMode);
-       }
-
-       @Test
-       public void testNumericAutocastInArithmetic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple8<Byte, Short, Integer, Long, Float, Double, 
Long, Double>> input =
-                               env.fromElements(new Tuple8<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result = table.select("f0 + 1, f1 +" +
-                               " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 
+ 1.0d, f7 + f0");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testNumericAutocastInComparison() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple6<Byte, Short, Integer, Long, Float, Double>> 
input =
-                               env.fromElements(
-                                               new Tuple6<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d),
-                                               new Tuple6<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d));
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a,b,c,d,e,f");
-
-               Table result = table
-                               .filter("a > 1 && b > 1 && c > 1L && d > 1.0f 
&& e > 1.0d && f > 1");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2,2,2,2.0,2.0";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testCasting() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple4<Integer, Double, Long, Boolean>> input =
-                               env.fromElements(new Tuple4<>(1, 0.0, 1L, 
true));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result = table.select(
-                               // * -> String
-                               "f0.cast(STRING), f1.cast(STRING), 
f2.cast(STRING), f3.cast(STRING)," +
-                               // NUMERIC TYPE -> Boolean
-                               "f0.cast(BOOL), f1.cast(BOOL), f2.cast(BOOL)," +
-                               // NUMERIC TYPE -> NUMERIC TYPE
-                               "f0.cast(DOUBLE), f1.cast(INT), 
f2.cast(SHORT)," +
-                               // Boolean -> NUMERIC TYPE
-                               "f3.cast(DOUBLE)," +
-                               // identity casting
-                               "f0.cast(INT), f1.cast(DOUBLE), f2.cast(LONG), 
f3.cast(BOOL)");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,0.0,1,true," +
-                       "true,false,true," +
-                       "1.0,0,1," +
-                       "1.0," +
-                       "1,0.0,1,true\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testCastFromString() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
-
-               DataSource<Tuple3<String, String, String>> input =
-                               env.fromElements(new Tuple3<>("1", "true", 
"2.0"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result = table.select(
-                               "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), 
f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1,1,1,2.0,2.0,true\n";
-               compareResultAsText(results, expected);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
deleted file mode 100644
index a916998..0000000
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-
-@RunWith(Parameterized.class)
-public class JoinITCase extends TableProgramsTestBase {
-
-       public JoinITCase(TestExecutionMode mode, TableConfigMode configMode){
-               super(mode, configMode);
-       }
-
-       @Test
-       public void testJoin() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2).where("b === e").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testJoinWithFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2).where("b === e && b < 
2").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hi,Hallo\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testJoinWithJoinFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.get3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2).where("b === e && a < 6 && h < 
b").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hello world, how are you?,Hallo Welt wie\n" +
-                               "I am fine.,Hallo Welt wie\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testJoinWithMultipleKeys() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.get3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2).where("a === d && b === 
h").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt wie gehts?\n" +
-                               "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I 
am fine.,IJK\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testJoinNonExistingKey() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               // Must fail. Field foo does not exist.
-               in1.join(in2).where("foo === e").select("c, g");
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testJoinWithNonMatchingKeyTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2)
-                       // Must fail. Types of join fields are not compatible 
(Integer and String)
-                       .where("a === g").select("c, g");
-
-               tableEnv.toDataSet(result, Row.class).collect();
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testJoinWithAmbiguousFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
-
-               // Must fail. Join input have overlapping field names.
-               in1.join(in2).where("a === d").select("c, g");
-       }
-
-       @Test
-       public void testJoinWithAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1
-                               .join(in2).where("a === d").select("g.count");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "6";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testJoinTablesFromDifferentEnvs() {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tEnv1 = 
TableEnvironment.getTableEnvironment(env);
-               BatchTableEnvironment tEnv2 = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
-               Table in2 = tEnv2.fromDataSet(ds2, "d, e, f, g, h");
-
-               // Must fail. Tables are bound to different TableEnvironments.
-               in1.join(in2).where("a === d").select("g.count");
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
index f3f554b..a6e5c56 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
@@ -298,7 +298,7 @@ class CalcITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     tEnv.registerFunction("hashCode",
-      new org.apache.flink.table.api.java.batch.table.CalcITCase.OldHashCode)
+      org.apache.flink.table.api.scala.batch.table.OldHashCode)
     tEnv.registerFunction("hashCode", MyHashCode)
 
     val ds = env.fromElements("a", "b", "c")

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
index a98c258..94c2a5c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.examples.scala.WordCountTable.{WC => MyWC}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
@@ -54,17 +54,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testAggregationOnNonExistingField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      // Must fail. Field 'foo does not exist.
-      .select('foo.avg)
-  }
-
   @Test
   def testWorkingAggregationDataTypes(): Unit = {
 
@@ -142,30 +131,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testNonWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Field '_1 is not a numeric type.
-      .select('_1.sum)
-
-    t.collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoNestedAggregations(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Sum aggregation can not be chained.
-      .select('_2.sum.sum)
-  }
-
   @Test
   def testSQLStyleAggregations(): Unit = {
 
@@ -236,30 +201,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testGroupingOnNonExistentField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. '_foo not a valid field
-      .groupBy('_foo)
-      .select('a.avg)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingInvalidSelection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      // must fail. 'c is not a grouping key or aggregation
-      .select('c)
-  }
-
   @Test
   def testGroupedAggregate(): Unit = {
 
@@ -360,9 +301,9 @@ class AggregationsITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-        .select('b, 4 as 'four, 'a)
-        .groupBy('b, 'four)
-        .select('four, 'a.sum)
+      .select('b, 4 as 'four, 'a)
+      .groupBy('b, 'four)
+      .select('four, 'a.sum)
 
     val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + 
"4,111\n"
     val results = t.toDataSet[Row].collect()
@@ -376,11 +317,11 @@ class AggregationsITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
'd, 'e)
-        .groupBy('e, 'b % 3)
-        .select('c.min, 'e, 'a.avg, 'd.count)
+      .groupBy('e, 'b % 3)
+      .select('c.min, 'e, 'a.avg, 'd.count)
 
     val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
-        "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
+      "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -402,4 +343,3 @@ class AggregationsITCase(
   }
 
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
index bc4f4bd..164e834 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
@@ -27,11 +27,11 @@ import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, TableException, 
ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -110,36 +110,6 @@ class CalcITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testSelectInvalidFieldFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. Field 'foo does not exist
-      .select('a, 'foo)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'foo
-      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'a
-      .select('a, 'b as 'a).toDataSet[Row].print()
-  }
-
   @Test
   def testSelectStar(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -158,41 +128,6 @@ class CalcITCase(
   }
 
   @Test
-  def testAliasStarException(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
-      fail("TableException expected")
-    } catch {
-      case _: TableException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c).select('*, 'b)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-  }
-
-  @Test
   def testAllRejectingFilter(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -326,17 +261,6 @@ class CalcITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testFilterInvalidFieldName(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    // must fail. Field 'foo does not exist
-    ds.filter( 'foo === 2 )
-  }
-
   @Test
   def testSimpleCalc(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -425,6 +349,19 @@ class CalcITCase(
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testUserDefinedScalarFunction() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    tableEnv.registerFunction("hashCode", OldHashCode)
+    tableEnv.registerFunction("hashCode", HashCode)
+    val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text)
+    val result = table.select("text.hashCode()")
+    val results = result.toDataSet[Row].collect()
+    val expected = "97\n98\n99"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }
 
 object CalcITCase {
@@ -436,3 +373,11 @@ object CalcITCase {
       Array(TestExecutionMode.COLLECTION, 
TableProgramsTestBase.NO_NULL)).asJava
   }
 }
+
+object HashCode extends ScalarFunction {
+  def eval(s: String): Int = s.hashCode
+}
+
+object OldHashCode extends ScalarFunction {
+  def eval(s: String): Int = -1
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
new file mode 100644
index 0000000..18d3333
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.Types._
+import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class CastingITCase {
+
+  @Test
+  def testNumericAutocastInArithmetic() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
+      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
+        '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testNumericAutocastInComparison() {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
+      (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
+      .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
+      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
+
+    val results = table.toDataSet[Row].collect()
+    val expected: String = "2,2,2,2,2.0,2.0"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCasting() {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
+      .select(
+        // * -> String
+      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
+        // NUMERIC TYPE -> Boolean
+      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
+        // NUMERIC TYPE -> NUMERIC TYPE
+      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
+        // Boolean -> NUMERIC TYPE
+      '_4.cast(DOUBLE),
+        // identity casting
+      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "1,0.0,1,true," + "true,false,true," +
+      "1.0,0,1," + "1.0," + "1,0.0,1,true\n"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCastFromString() {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
+      .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
+        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "1,1,1,1,2.0,2.0,true\n"
+    compareResultAsText(results.asJava, expected)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
index ce16ada..277db4c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
@@ -103,80 +103,12 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testJoinNonExistingKey(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'foo does not exist
-      .where('foo === 'e)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithNonMatchingKeyTypes(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'a is Int, and 'g is String
-      .where('a === 'g)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithAmbiguousFields(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'c)
-
-    ds1.join(ds2)
-      // must fail. Both inputs share the same field 'c
-      .where('a === 'd)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate1(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('d === 'f)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate2(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('a < 'd)
-      .select('c, 'g).collect()
-  }
-
   @Test
   def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    // use different table env in order to let tmp table ids are the same
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
@@ -262,19 +194,6 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testJoinTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 
'f, 'g, 'h)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.join(ds2).where('b === 'e).select('c, 'g)
-  }
-
   @Test
   def testLeftJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
@@ -297,30 +216,6 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testNoJoinCondition(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
-
-    val joinT = ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoEquiJoin(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
-
-    val joinT = ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
-  }
-
   @Test
   def testRightJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
@@ -330,7 +225,7 @@ class JoinITCase(
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
 
-    val joinT = ds1.rightOuterJoin(ds2, "a = d && b = h").select('c, 'g)
+    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt 
wie\n" +
       "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + 
"null,BCD\n" + "null,CDE\n" +
@@ -349,7 +244,7 @@ class JoinITCase(
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
 
-    val joinT = ds1.rightOuterJoin(ds2, "a = d && b < h").select('c, 'g)
+    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
 
     val expected = "Hello world,BCD\n"
     val results = joinT.toDataSet[Row].collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
index e369250..4e02a98 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -105,44 +105,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentColumnSize(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'd, 'c, 'e)
-
-    // must fail. Union inputs have different column size.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Union inputs have different field types.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 
'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2).select('c)
-  }
-
   @Test
   def testMinusAll(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
@@ -178,19 +140,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testMinusDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Minus inputs have different field types.
-    ds1.minus(ds2)
-  }
-
   @Test
   def testMinusDifferentFieldNames(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
@@ -207,19 +156,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testMinusAllTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 
'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.minusAll(ds2).select('c)
-  }
-
   @Test
   def testIntersect(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
@@ -275,32 +211,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testIntersectWithDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Intersect inputs have different field types.
-    ds1.intersect(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 
'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.intersect(ds2).select('c)
-  }
-
   @Test
   def testIntersectWithScalarExpression(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
index 3cbc2c8..2991aaa 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
@@ -172,15 +172,4 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testFetchWithoutOrder(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).limit(0, 5)
-
-    t.toDataSet[Row].collect()
-  }
-
 }

Reply via email to