[hotfix] [tests] Share proper test mini cluster for tests in 
DistributedCacheTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06216334
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06216334
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06216334

Branch: refs/heads/release-1.3
Commit: 062163344f8785733503837b83a4d063baa3c4cf
Parents: 1b21737
Author: Stephan Ewen <[email protected]>
Authored: Wed May 10 20:57:08 2017 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed May 10 21:08:53 2017 +0200

----------------------------------------------------------------------
 .../distributedCache/DistributedCacheTest.java  | 56 ++++++++++++++++----
 1 file changed, 46 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06216334/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
index 19bcf76..21aa40a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
@@ -22,19 +22,30 @@ import 
org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
-import java.io.*;
+import static org.junit.Assert.assertTrue;
 
-import java.util.*;
 
+public class DistributedCacheTest extends AbstractTestBase {
 
-public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
        public static final String data
                        = "machen\n"
                        + "zeit\n"
@@ -42,6 +53,31 @@ public class DistributedCacheTest extends 
StreamingMultipleProgramsTestBase {
                        + "keiner\n"
                        + "meine\n";
 
+       private static final int PARALLELISM = 4;
+
+       private static LocalFlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, 
false, true);
+               TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
+               TestEnvironment.setAsContext(cluster, PARALLELISM);
+       }
+
+       @AfterClass
+       public static void teardown() throws Exception {
+               TestStreamEnvironment.unsetAsContext();
+               TestEnvironment.unsetAsContext();
+               TestBaseUtils.stopCluster(cluster, 
TestBaseUtils.DEFAULT_TIMEOUT);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public DistributedCacheTest() {
+               super(new Configuration());
+       }
+       
+       // 
------------------------------------------------------------------------
 
        @Test
        public void testStreamingDistributedCache() throws Exception {
@@ -68,12 +104,12 @@ public class DistributedCacheTest extends 
StreamingMultipleProgramsTestBase {
                @Override
                public void open(Configuration conf) throws IOException {
                        File file = 
getRuntimeContext().getDistributedCache().getFile("cache_test");
-                       BufferedReader reader = new BufferedReader(new 
FileReader(file));
-                       String tempString;
-                       while ((tempString = reader.readLine()) != null) {
-                               wordList.add(tempString);
+                       try (BufferedReader reader = new BufferedReader(new 
FileReader(file))) {
+                               String tempString;
+                               while ((tempString= reader.readLine()) != null) 
{
+                                       wordList.add(tempString);
+                               }
                        }
-                       reader.close();
                }
 
                @Override

Reply via email to