Repository: flink Updated Branches: refs/heads/master 728c936dd -> d5c320c3b
[FLINK-5586] [table] Extend TableProgramsClusterTestBase for object reuse mode This closes #3339. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5c320c3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5c320c3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5c320c3 Branch: refs/heads/master Commit: d5c320c3b6ee7b99bb2fd72500a0caa7fea24083 Parents: 728c936 Author: Kurt Young <[email protected]> Authored: Fri Feb 17 17:02:23 2017 +0800 Committer: Kurt Young <[email protected]> Committed: Sat Mar 4 09:11:02 2017 +0800 ---------------------------------------------------------------------- .../main/java/org/apache/flink/types/Row.java | 14 ++++++ .../api/java/batch/sql/GroupingSetsITCase.java | 4 +- .../table/api/scala/batch/sql/SortITCase.scala | 33 +++++++++++--- .../api/scala/batch/table/SortITCase.scala | 47 +++++++++++++++++--- .../utils/TableProgramsClusterTestBase.scala | 20 ++++++++- .../utils/TableProgramsCollectionTestBase.scala | 18 +++++++- .../batch/utils/TableProgramsTestBase.scala | 9 ---- .../runtime/dataset/DataSetCalcITCase.scala | 6 ++- .../DataSetUserDefinedFunctionITCase.scala | 6 ++- .../dataset/DataSetWindowAggregateITCase.scala | 7 ++- .../test/util/MultipleProgramsTestBase.java | 12 ++--- 11 files changed, 139 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-core/src/main/java/org/apache/flink/types/Row.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java index f9a5add..0b2120f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Row.java +++ b/flink-core/src/main/java/org/apache/flink/types/Row.java @@ -139,4 +139,18 @@ public class Row implements Serializable{ } return row; } + + /** + * Creates a new Row which copied from another row. + * + * @param row The row being copied. + * @return The cloned new Row + */ + public static Row copy(Row row) { + Row ret = new Row(row.getArity()); + for (int i = 0; i < row.getArity(); ++i) { + ret.setField(i, row.getField(i)); + } + return ret; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java index 54f7da7..3f611d5 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java @@ -50,8 +50,8 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase { private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls"; private BatchTableEnvironment tableEnv; - public GroupingSetsITCase(TableConfigMode tableConfigMode) { - super(tableConfigMode); + public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) { + super(mode, tableConfigMode); } @Before http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala index c577797..bc04cc7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala @@ -25,6 +25,7 @@ import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._ import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row import org.junit._ @@ -32,9 +33,11 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized import scala.collection.JavaConverters._ +import scala.collection.mutable @RunWith(classOf[Parameterized]) -class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) { +class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { private def getExecutionEnvironment = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -59,7 +62,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings) // squash all rows inside a partition into one element - val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() def rowOrdering = Ordering.by((r : Row) => { // ordering for this tuple will fall into the previous defined tupleOrdering, @@ -91,7 +99,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings, 2, 21) // squash all rows inside a partition into one element - val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() val result = results. filterNot(_.isEmpty) @@ -117,7 +130,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings, 2, 7) // squash all rows inside a partition into one element - val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() val result = results .filterNot(_.isEmpty) @@ -143,7 +161,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings, 0, 5) // squash all rows inside a partition into one element - val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() def rowOrdering = Ordering.by((r : Row) => { // ordering for this tuple will fall into the previous defined tupleOrdering, http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala index a84d8a9..09d3c04 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala @@ -25,6 +25,7 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._ import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row import org.junit._ @@ -32,9 +33,11 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized import scala.collection.JavaConverters._ +import scala.collection.mutable @RunWith(classOf[Parameterized]) -class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) { +class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { private def getExecutionEnvironment = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -56,7 +59,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings) // squash all rows inside a partition into one element - val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = t.toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() val result = results .filterNot(_.isEmpty) @@ -79,7 +87,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings) // squash all rows inside a partition into one element - val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = t.toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() val result = results .filterNot(_.isEmpty) @@ -102,7 +115,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings) // squash all rows inside a partition into one element - val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = t.toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() def rowOrdering = Ordering.by((r : Row) => { // ordering for this tuple will fall into the previous defined tupleOrdering, @@ -131,7 +149,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings, 3, 21) // squash all rows inside a partition into one element - val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = t.toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() val result = results .filterNot(_.isEmpty) @@ -154,7 +177,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings, 3, 8) // squash all rows inside a partition into one element - val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = t.toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() val result = results .filterNot(_.isEmpty) @@ -177,7 +205,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa val expected = sortExpectedly(tupleDataSetStrings, 0, 5) // squash all rows inside a partition into one element - val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + val results = t.toDataSet[Row].mapPartition(rows => { + // the rows need to be copied in object reuse mode + val copied = new mutable.ArrayBuffer[Row] + rows.foreach(r => copied += Row.copy(r)) + Seq(copied) + }).collect() implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]) http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala index b82ea9f..9313b5b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala @@ -18,8 +18,13 @@ package org.apache.flink.table.api.scala.batch.utils +import java.util + import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit.runners.Parameterized + +import scala.collection.JavaConversions._ /** * This test base provides full cluster-like integration tests for batch programs. Only runtime @@ -27,6 +32,19 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode * (e.g. [[org.apache.flink.table.runtime.dataset.DataSetWindowAggregateITCase]]) */ class TableProgramsClusterTestBase( + executionMode: TestExecutionMode, tableConfigMode: TableConfigMode) - extends TableProgramsTestBase(TestExecutionMode.CLUSTER, tableConfigMode) { + extends TableProgramsTestBase(executionMode, tableConfigMode) { +} + +object TableProgramsClusterTestBase { + + @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(TestExecutionMode.CLUSTER, TableProgramsTestBase.DEFAULT), + Array(TestExecutionMode.CLUSTER_OBJECT_REUSE, TableProgramsTestBase.DEFAULT) + ) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala index ba0ea61..cd7c12d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala @@ -18,8 +18,13 @@ package org.apache.flink.table.api.scala.batch.utils +import java.util + import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit.runners.Parameterized + +import scala.collection.JavaConversions._ /** * This test base provides lightweight integration tests for batch programs. However, it does @@ -27,6 +32,15 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode * use [[TableProgramsClusterTestBase]]. */ class TableProgramsCollectionTestBase( - tableConfigMode: TableConfigMode) - extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) { + tableConfigMode: TableConfigMode) + extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) { +} + +object TableProgramsCollectionTestBase { + + @Parameterized.Parameters(name = "Table config = {0}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]](Array(TableProgramsTestBase.DEFAULT)) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala index 586d716..cf9d947 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala @@ -18,15 +18,10 @@ package org.apache.flink.table.api.scala.batch.utils -import java.util - import org.apache.flink.table.api.TableConfig import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{NO_NULL, TableConfigMode} import org.apache.flink.test.util.MultipleProgramsTestBase import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit.runners.Parameterized - -import scala.collection.JavaConversions._ class TableProgramsTestBase( mode: TestExecutionMode, @@ -50,8 +45,4 @@ object TableProgramsTestBase { val DEFAULT = TableConfigMode(nullCheck = true) val NO_NULL = TableConfigMode(nullCheck = false) - @Parameterized.Parameters(name = "Table config = {0}") - def parameters(): util.Collection[Array[java.lang.Object]] = { - Seq[Array[AnyRef]](Array(DEFAULT)) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala index f0b3b44..dc6ab1c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2, RichFunc3} import org.apache.flink.table.utils._ +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row import org.junit.Test @@ -36,8 +37,9 @@ import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class DataSetCalcITCase( - configMode: TableConfigMode) - extends TableProgramsClusterTestBase(configMode) { + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { @Test def testUserDefinedScalarFunctionWithParameter(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala index 3d20803..33b2439 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala @@ -28,6 +28,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase import org.apache.flink.table.expressions.utils.{Func13, RichFunc2} import org.apache.flink.table.utils._ +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row import org.junit.Test @@ -39,8 +40,9 @@ import scala.collection.mutable @RunWith(classOf[Parameterized]) class DataSetUserDefinedFunctionITCase( - configMode: TableConfigMode) - extends TableProgramsClusterTestBase(configMode) { + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { @Test def testCrossJoin(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala index 882f4b6..d57f4f7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala @@ -31,12 +31,15 @@ import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.apache.flink.table.api.ValidationException +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) -class DataSetWindowAggregateITCase(configMode: TableConfigMode) - extends TableProgramsClusterTestBase(configMode) { +class DataSetWindowAggregateITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsClusterTestBase(mode, configMode) { val data = List( (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"), http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java index 4e83245..2043cd0 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java @@ -62,7 +62,8 @@ public class MultipleProgramsTestBase extends TestBaseUtils { */ public enum TestExecutionMode { CLUSTER, - COLLECTION + CLUSTER_OBJECT_REUSE, + COLLECTION, } // ------------------------------------------------------------------------ @@ -85,12 +86,13 @@ public class MultipleProgramsTestBase extends TestBaseUtils { switch(mode){ case CLUSTER: - TestEnvironment clusterEnv = new TestEnvironment(cluster, 4); - clusterEnv.setAsContext(); + new TestEnvironment(cluster, 4).setAsContext(); + break; + case CLUSTER_OBJECT_REUSE: + new TestEnvironment(cluster, 4, true).setAsContext(); break; case COLLECTION: - CollectionTestEnvironment collectionEnv = new CollectionTestEnvironment(); - collectionEnv.setAsContext(); + new CollectionTestEnvironment().setAsContext(); break; } }
