[hotfix] [tests] Fix mini cluster usage and logging/printing in 
CustomDistributionITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/addad1af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/addad1af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/addad1af

Branch: refs/heads/flip-6
Commit: addad1af453a088c559db234370db565a35fbc11
Parents: 635c869
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 21:02:09 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 21:19:04 2016 +0200

----------------------------------------------------------------------
 .../CustomDistributionITCase.java               | 110 +++++++++++--------
 1 file changed, 64 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/addad1af/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index c6bc08e..ca2c156 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -30,30 +30,60 @@ import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
-import org.junit.Test;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 import java.io.IOException;
 
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("serial")
+public class CustomDistributionITCase extends TestLogger {
 
-public class CustomDistributionITCase {
+       // 
------------------------------------------------------------------------
+       //  The mini cluster that is shared across tests
+       // 
------------------------------------------------------------------------
 
-       @Test
-       public void testPartitionWithDistribution1() throws Exception{
-               /*
-                * Test the record partitioned rightly with one field according 
to the customized data distribution
-                */
+       private static ForkableFlinkMiniCluster cluster;
 
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+       @BeforeClass
+       public static void setup() throws Exception {
+               cluster = TestBaseUtils.startCluster(1, 8, false, false, true);
+       }
 
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+       @AfterClass
+       public static void teardown() throws Exception {
+               TestBaseUtils.stopCluster(cluster, 
TestBaseUtils.DEFAULT_TIMEOUT);
+       }
+
+       @Before
+       public void prepare() {
+               TestEnvironment clusterEnv = new TestEnvironment(cluster, 1);
+               clusterEnv.setAsContext();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Test the record partitioned rightly with one field according to the 
customized data distribution
+        */
+       @Test
+       public void testPartitionWithDistribution1() throws Exception {
                final TestDataDist1 dist = new TestDataDist1();
 
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(dist.getParallelism());
 
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
                DataSet<Boolean> result = DataSetUtils
                        .partitionByRange(input, dist, 0)
                        .mapPartition(new 
RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@@ -96,13 +126,15 @@ public class CustomDistributionITCase {
                env.execute();
        }
 
+       /**
+        * Test the record partitioned rightly with two fields according to the 
customized data distribution
+        */
        @Test
-       public void testRangeWithDistribution2() throws Exception{
-               /*
-                * Test the record partitioned rightly with two fields 
according to the customized data distribution
-                */
+       public void testRangeWithDistribution2() throws Exception {
+               final TestDataDist2 dist = new TestDataDist2();
 
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(dist.getParallelism());
 
                DataSet<Tuple3<Integer, Integer, String>> input = 
env.fromElements(
                                                new Tuple3<>(1, 5, "Hi"),
@@ -122,10 +154,6 @@ public class CustomDistributionITCase {
                                                new Tuple3<>(5, 3, "Hi Java 
again")
                        );
 
-               final TestDataDist2 dist = new TestDataDist2();
-
-               env.setParallelism(dist.getParallelism());
-
                DataSet<Boolean> result = DataSetUtils
                        .partitionByRange(input, dist, 0, 1)
                        .mapPartition(new 
RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
@@ -175,18 +203,18 @@ public class CustomDistributionITCase {
                env.execute();
        }
 
+       /*
+        * Test the number of partition keys less than the number of 
distribution fields
+        */
        @Test
-       public void testPartitionKeyLessDistribution() throws Exception{
-               /*
-                * Test the number of partition keys less than the number of 
distribution fields
-                */
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+       public void testPartitionKeyLessDistribution() throws Exception {
                final TestDataDist2 dist = new TestDataDist2();
 
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(dist.getParallelism());
 
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
                DataSet<Boolean> result = DataSetUtils
                        .partitionByRange(input, dist, 0)
                        .mapPartition(new 
RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@@ -229,19 +257,17 @@ public class CustomDistributionITCase {
                env.execute();
        }
 
+       /*
+        * Test the number of partition keys larger than the number of 
distribution fields
+        */
        @Test(expected = IllegalArgumentException.class)
-       public void testPartitionMoreThanDistribution() throws Exception{
-               /*
-                * Test the number of partition keys larger than the number of 
distribution fields
-                */
+       public void testPartitionMoreThanDistribution() throws Exception {
+               final TestDataDist2 dist = new TestDataDist2();
 
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-               final TestDataDist2 dist = new TestDataDist2();
-
-               DataSet<Tuple3<Integer, Long, String>> result = DataSetUtils
-                               .partitionByRange(input, dist, 0, 1, 2);
+               DataSetUtils.partitionByRange(input, dist, 0, 1, 2);
        }
        
        /**
@@ -278,14 +304,10 @@ public class CustomDistributionITCase {
                }
 
                @Override
-               public void write(DataOutputView out) throws IOException {
-                       
-               }
+               public void write(DataOutputView out) throws IOException {}
 
                @Override
-               public void read(DataInputView in) throws IOException {
-                       
-               }
+               public void read(DataInputView in) throws IOException {}
        }
 
        /**
@@ -323,13 +345,9 @@ public class CustomDistributionITCase {
                }
 
                @Override
-               public void write(DataOutputView out) throws IOException {
-                       
-               }
+               public void write(DataOutputView out) throws IOException {}
 
                @Override
-               public void read(DataInputView in) throws IOException {
-                       
-               }
+               public void read(DataInputView in) throws IOException {}
        }
 }

Reply via email to