[hotfix] [tests] Fix mini cluster usage and logging/printing in CustomDistributionITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/addad1af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/addad1af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/addad1af Branch: refs/heads/flip-6 Commit: addad1af453a088c559db234370db565a35fbc11 Parents: 635c869 Author: Stephan Ewen <se...@apache.org> Authored: Wed Aug 24 21:02:09 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Aug 24 21:19:04 2016 +0200 ---------------------------------------------------------------------- .../CustomDistributionITCase.java | 110 +++++++++++-------- 1 file changed, 64 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/addad1af/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java index c6bc08e..ca2c156 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -30,30 +30,60 @@ import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.Collector; -import org.junit.Test; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; import java.io.IOException; import static org.junit.Assert.fail; +@SuppressWarnings("serial") +public class CustomDistributionITCase extends TestLogger { -public class CustomDistributionITCase { + // ------------------------------------------------------------------------ + // The mini cluster that is shared across tests + // ------------------------------------------------------------------------ - @Test - public void testPartitionWithDistribution1() throws Exception{ - /* - * Test the record partitioned rightly with one field according to the customized data distribution - */ + private static ForkableFlinkMiniCluster cluster; - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + @BeforeClass + public static void setup() throws Exception { + cluster = TestBaseUtils.startCluster(1, 8, false, false, true); + } - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + @AfterClass + public static void teardown() throws Exception { + TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + } + + @Before + public void prepare() { + TestEnvironment clusterEnv = new TestEnvironment(cluster, 1); + clusterEnv.setAsContext(); + } + + // ------------------------------------------------------------------------ + + /** + * Test the record partitioned rightly with one field according to the customized data distribution + */ + @Test + public void testPartitionWithDistribution1() throws Exception { final TestDataDist1 dist = new TestDataDist1(); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(dist.getParallelism()); + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + DataSet<Boolean> result = DataSetUtils .partitionByRange(input, dist, 0) .mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() { @@ -96,13 +126,15 @@ public class CustomDistributionITCase { env.execute(); } + /** + * Test the record partitioned rightly with two fields according to the customized data distribution + */ @Test - public void testRangeWithDistribution2() throws Exception{ - /* - * Test the record partitioned rightly with two fields according to the customized data distribution - */ + public void testRangeWithDistribution2() throws Exception { + final TestDataDist2 dist = new TestDataDist2(); - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(dist.getParallelism()); DataSet<Tuple3<Integer, Integer, String>> input = env.fromElements( new Tuple3<>(1, 5, "Hi"), @@ -122,10 +154,6 @@ public class CustomDistributionITCase { new Tuple3<>(5, 3, "Hi Java again") ); - final TestDataDist2 dist = new TestDataDist2(); - - env.setParallelism(dist.getParallelism()); - DataSet<Boolean> result = DataSetUtils .partitionByRange(input, dist, 0, 1) .mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() { @@ -175,18 +203,18 @@ public class CustomDistributionITCase { env.execute(); } + /* + * Test the number of partition keys less than the number of distribution fields + */ @Test - public void testPartitionKeyLessDistribution() throws Exception{ - /* - * Test the number of partition keys less than the number of distribution fields - */ - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + public void testPartitionKeyLessDistribution() throws Exception { final TestDataDist2 dist = new TestDataDist2(); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(dist.getParallelism()); + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + DataSet<Boolean> result = DataSetUtils .partitionByRange(input, dist, 0) .mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() { @@ -229,19 +257,17 @@ public class CustomDistributionITCase { env.execute(); } + /* + * Test the number of partition keys larger than the number of distribution fields + */ @Test(expected = IllegalArgumentException.class) - public void testPartitionMoreThanDistribution() throws Exception{ - /* - * Test the number of partition keys larger than the number of distribution fields - */ + public void testPartitionMoreThanDistribution() throws Exception { + final TestDataDist2 dist = new TestDataDist2(); - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - final TestDataDist2 dist = new TestDataDist2(); - - DataSet<Tuple3<Integer, Long, String>> result = DataSetUtils - .partitionByRange(input, dist, 0, 1, 2); + DataSetUtils.partitionByRange(input, dist, 0, 1, 2); } /** @@ -278,14 +304,10 @@ public class CustomDistributionITCase { } @Override - public void write(DataOutputView out) throws IOException { - - } + public void write(DataOutputView out) throws IOException {} @Override - public void read(DataInputView in) throws IOException { - - } + public void read(DataInputView in) throws IOException {} } /** @@ -323,13 +345,9 @@ public class CustomDistributionITCase { } @Override - public void write(DataOutputView out) throws IOException { - - } + public void write(DataOutputView out) throws IOException {} @Override - public void read(DataInputView in) throws IOException { - - } + public void read(DataInputView in) throws IOException {} } }