Repository: flink
Updated Branches:
  refs/heads/master b1913a4af -> dba7d7da9


[FLINK-5268] Split TableProgramsTestBase into TableProgramsCollectionTestBase 
and TableProgramsClusterTestBase

This closes #3099.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dba7d7da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dba7d7da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dba7d7da

Branch: refs/heads/master
Commit: dba7d7da9b81fce21c13093634118a863a2e2bcc
Parents: b1913a4
Author: mtunique <oatg...@gmail.com>
Authored: Thu Jan 12 17:30:57 2017 +0800
Committer: twalthr <twal...@apache.org>
Committed: Tue Jan 17 13:41:12 2017 +0100

----------------------------------------------------------------------
 .../api/java/batch/TableEnvironmentITCase.java  | 15 ++++-----
 .../table/api/java/batch/TableSourceITCase.java |  8 ++---
 .../api/java/batch/sql/GroupingSetsITCase.java  | 18 ++++++-----
 .../table/api/java/batch/sql/SqlITCase.java     |  8 ++---
 .../scala/batch/TableEnvironmentITCase.scala    | 14 ++++-----
 .../table/api/scala/batch/TableSinkITCase.scala |  6 ++--
 .../api/scala/batch/TableSourceITCase.scala     |  7 ++---
 .../scala/batch/sql/AggregationsITCase.scala    |  8 ++---
 .../table/api/scala/batch/sql/CalcITCase.scala  | 12 +++-----
 .../table/api/scala/batch/sql/JoinITCase.scala  |  6 ++--
 .../scala/batch/sql/SetOperatorsITCase.scala    |  6 ++--
 .../table/api/scala/batch/sql/SortITCase.scala  |  6 ++--
 .../scala/batch/sql/TableWithSQLITCase.scala    |  6 ++--
 .../scala/batch/table/AggregationsITCase.scala  |  6 ++--
 .../api/scala/batch/table/CalcITCase.scala      | 12 +++-----
 .../api/scala/batch/table/CastingITCase.scala   | 11 +++----
 .../api/scala/batch/table/JoinITCase.scala      |  5 ++-
 .../scala/batch/table/SetOperatorsITCase.scala  |  6 ++--
 .../api/scala/batch/table/SortITCase.scala      |  8 ++---
 .../utils/TableProgramsClusterTestBase.scala    | 32 ++++++++++++++++++++
 .../utils/TableProgramsCollectionTestBase.scala | 32 ++++++++++++++++++++
 .../batch/utils/TableProgramsTestBase.scala     |  4 +--
 .../dataset/DataSetCorrelateITCase.scala        |  6 ++--
 .../dataset/DataSetWindowAggregateITCase.scala  |  9 ++----
 24 files changed, 142 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index b9136d3..e84c906 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -33,8 +33,9 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.types.Row;
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.types.Row;
 import org.apache.flink.table.calcite.CalciteConfig;
 import org.apache.flink.table.calcite.CalciteConfigBuilder;
 import org.apache.flink.table.api.Table;
@@ -46,17 +47,17 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class TableEnvironmentITCase extends TableProgramsTestBase {
+public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
 
-       public TableEnvironmentITCase(TestExecutionMode mode, TableConfigMode 
configMode) {
-               super(mode, configMode);
+       public TableEnvironmentITCase(TableConfigMode configMode) {
+               super(configMode);
        }
 
-       @Parameterized.Parameters(name = "Execution mode = {0}, Table config = 
{1}")
+       @Parameterized.Parameters(name = "Table config = {0}")
        public static Collection<Object[]> parameters() {
                return Arrays.asList(new Object[][] {
-                       { TestExecutionMode.COLLECTION, 
TableProgramsTestBase.DEFAULT() },
-                       { TestExecutionMode.COLLECTION, 
TableProgramsTestBase.EFFICIENT() }
+                       { TableProgramsTestBase.DEFAULT() },
+                       { TableProgramsTestBase.EFFICIENT() }
                });
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
index d67725e..becd870 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.table.sources.BatchTableSource;
 import org.apache.flink.table.utils.CommonTestData;
 import org.apache.flink.types.Row;
@@ -34,10 +34,10 @@ import org.junit.runners.Parameterized;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class TableSourceITCase extends TableProgramsTestBase {
+public class TableSourceITCase extends TableProgramsCollectionTestBase {
 
-       public TableSourceITCase(TestExecutionMode mode, TableConfigMode 
configMode) {
-               super(mode, configMode);
+       public TableSourceITCase(TableConfigMode configMode) {
+               super(configMode);
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
index f7111f7..54f7da7 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.api.java.batch.sql;
 
+import java.util.Comparator;
+import java.util.List;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -27,7 +29,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
@@ -36,18 +38,20 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Comparator;
-import java.util.List;
-
+/**
+ * This test should be replaced by a DataSetAggregateITCase.
+ * We should only perform logical unit tests here.
+ * Until then, we perform a cluster test here.
+ */
 @RunWith(Parameterized.class)
-public class GroupingSetsITCase extends TableProgramsTestBase {
+public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 
        private final static String TABLE_NAME = "MyTable";
        private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
        private BatchTableEnvironment tableEnv;
 
-       public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode 
tableConfigMode) {
-               super(mode, tableConfigMode);
+       public GroupingSetsITCase(TableConfigMode tableConfigMode) {
+               super(tableConfigMode);
        }
 
        @Before

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index 433410c..5ba67dd 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -35,10 +35,10 @@ import org.junit.runners.Parameterized;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class SqlITCase extends TableProgramsTestBase {
+public class SqlITCase extends TableProgramsCollectionTestBase {
 
-       public SqlITCase(TestExecutionMode mode, TableConfigMode configMode) {
-               super(mode, configMode);
+       public SqlITCase(TableConfigMode configMode) {
+               super(configMode);
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 961e575..2b00cc9 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -21,13 +21,12 @@ package org.apache.flink.table.api.scala.batch
 import java.util
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
-import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, 
TableProgramsTestBase}
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -37,9 +36,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class TableEnvironmentITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testSimpleRegister(): Unit = {
@@ -260,11 +258,11 @@ class TableEnvironmentITCase(
 
 object TableEnvironmentITCase {
 
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  @Parameterized.Parameters(name = "Table config = {0}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
     Seq[Array[AnyRef]](
-      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
-      Array(TestExecutionMode.COLLECTION, 
TableProgramsTestBase.EFFICIENT)).asJava
+      Array(TableProgramsTestBase.DEFAULT),
+      Array(TableProgramsTestBase.EFFICIENT)).asJava
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
index 8bc7874..8fca992 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
@@ -21,13 +21,12 @@ package org.apache.flink.table.api.scala.batch
 import java.io.File
 
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.sinks.CsvTableSink
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Test
 import org.junit.runner.RunWith
@@ -36,9 +35,8 @@ import org.junit.runners.Parameterized
 
 @RunWith(classOf[Parameterized])
 class TableSinkITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testBatchTableSink(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index f5ab352..70f4345 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -19,14 +19,12 @@
 package org.apache.flink.table.api.scala.batch
 
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -35,9 +33,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class TableSourceITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testCsvTableSource(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
index d6f2b7b..d7e429c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
@@ -19,13 +19,12 @@
 package org.apache.flink.table.api.scala.batch.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -35,9 +34,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class AggregationsITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testAggregationTypes(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
index a6e5c56..3710642 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
@@ -24,13 +24,12 @@ import java.util
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala.batch.sql.FilterITCase.MyHashCode
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, 
TableProgramsTestBase}
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.{TableEnvironment, ValidationException}
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit._
@@ -41,9 +40,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class CalcITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testSelectStarFromTable(): Unit = {
@@ -320,10 +318,10 @@ object FilterITCase {
 
 object CalcITCase {
 
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  @Parameterized.Parameters(name = "Table config = {0}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
     Seq[Array[AnyRef]](
-      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
-      Array(TestExecutionMode.COLLECTION, 
TableProgramsTestBase.NO_NULL)).asJava
+      Array(TableProgramsTestBase.DEFAULT),
+      Array(TableProgramsTestBase.NO_NULL)).asJava
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
index 96beea5..d07c282 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -19,12 +19,11 @@
 package org.apache.flink.table.api.scala.batch.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.{TableEnvironment, TableException, 
ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit._
@@ -35,9 +34,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class JoinITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testJoin(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala
index cc44e7a..dd99114 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala
@@ -19,13 +19,12 @@
 package org.apache.flink.table.api.scala.batch.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -37,9 +36,8 @@ import scala.util.Random
 
 @RunWith(classOf[Parameterized])
 class SetOperatorsITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testUnionAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
index 0f46e9b..43847dc 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.scala.batch.sql
 
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala._
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -37,9 +36,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class SortITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testOrderByMultipleFieldsWithSql(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
index aabc62a..89bf783 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
@@ -19,13 +19,12 @@
 package org.apache.flink.table.api.scala.batch.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -35,9 +34,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class TableWithSQLITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testSQLTable(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
index 62c70a2..22b7f0f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
@@ -19,14 +19,13 @@
 package org.apache.flink.table.api.scala.batch.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.examples.scala.WordCountTable.{WC => MyWC}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -36,9 +35,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class AggregationsITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testAggregationTypes(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
index 164e834..2f853f3 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Time, Timestamp}
 import java.util
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, 
TableProgramsTestBase}
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -30,7 +30,6 @@ import org.apache.flink.types.Row
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.expressions.Literal
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -40,9 +39,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class CalcITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testSimpleSelectAll(): Unit = {
@@ -366,11 +364,11 @@ class CalcITCase(
 
 object CalcITCase {
 
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  @Parameterized.Parameters(name = "Table config = {0}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
     Seq[Array[AnyRef]](
-      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
-      Array(TestExecutionMode.COLLECTION, 
TableProgramsTestBase.NO_NULL)).asJava
+      Array(TableProgramsTestBase.DEFAULT),
+      Array(TableProgramsTestBase.NO_NULL)).asJava
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
index 2f8e9d2..0076b8e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
@@ -19,12 +19,11 @@
 package org.apache.flink.table.api.scala.batch.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.Types._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
 import org.apache.flink.types.Row
 import org.junit._
@@ -34,10 +33,8 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class CastingITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+class CastingITCase(configMode: TableConfigMode)
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testNumericAutocastInArithmetic() {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
index 277db4c..3305949 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.scala.batch.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -36,9 +36,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class JoinITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testJoin(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
index 4e02a98..67da532 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
@@ -19,13 +19,12 @@
 package org.apache.flink.table.api.scala.batch.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -37,9 +36,8 @@ import scala.util.Random
 
 @RunWith(classOf[Parameterized])
 class SetOperatorsITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
   def testUnionAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
index 2991aaa..6fe7624 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
@@ -19,14 +19,13 @@
 package org.apache.flink.table.api.scala.batch.table
 
 import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -36,9 +35,8 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class SortITCase(
-    mode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsCollectionTestBase(configMode) {
 
   def getExecutionEnvironment = {
     val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
new file mode 100644
index 0000000..b82ea9f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.utils
+
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+
+/**
+  * This test base provides full cluster-like integration tests for batch 
programs. Only runtime
+  * operator tests should use this test base as they are expensive.
+  * (e.g. 
[[org.apache.flink.table.runtime.dataset.DataSetWindowAggregateITCase]])
+  */
+class TableProgramsClusterTestBase(
+    tableConfigMode: TableConfigMode)
+  extends TableProgramsTestBase(TestExecutionMode.CLUSTER, tableConfigMode) {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
new file mode 100644
index 0000000..ba0ea61
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.utils
+
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+
+/**
+  * This test base provides lightweight integration tests for batch programs. 
However, it does
+  * not test everything (e.g. combiners). Runtime operator tests should
+  * use [[TableProgramsClusterTestBase]].
+  */
+class TableProgramsCollectionTestBase(
+  tableConfigMode: TableConfigMode)
+  extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) 
{
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
index 45315d6..a699068 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -53,8 +53,8 @@ object TableProgramsTestBase {
   val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
   val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
 
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  @Parameterized.Parameters(name = "Table config = {0}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT))
+    Seq[Array[AnyRef]](Array(DEFAULT))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCorrelateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCorrelateITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCorrelateITCase.scala
index 550669e..783a457 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCorrelateITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCorrelateITCase.scala
@@ -19,12 +19,11 @@ package org.apache.flink.table.runtime.dataset
 
 import org.apache.flink.api.scala._
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.{TableProgramsClusterTestBase, 
TableProgramsCollectionTestBase}
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.utils._
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Test
 import org.junit.runner.RunWith
@@ -35,9 +34,8 @@ import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class DataSetCorrelateITCase(
-  mode: TestExecutionMode,
   configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+  extends TableProgramsClusterTestBase(configMode) {
 
   @Test
   def testCrossJoin(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba7d7da/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index fbdbec4..21aec05 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -21,9 +21,8 @@ package org.apache.flink.table.runtime.dataset
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit._
@@ -33,10 +32,8 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class DataSetWindowAggregateITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+class DataSetWindowAggregateITCase(configMode: TableConfigMode)
+  extends TableProgramsClusterTestBase(configMode) {
 
   val data = List(
     (1L, 1, "Hi"),

Reply via email to