[hotfix] [tests] Share proper test mini cluster for tests in DistributedCacheTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06216334 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06216334 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06216334 Branch: refs/heads/release-1.3 Commit: 062163344f8785733503837b83a4d063baa3c4cf Parents: 1b21737 Author: Stephan Ewen <[email protected]> Authored: Wed May 10 20:57:08 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed May 10 21:08:53 2017 +0200 ---------------------------------------------------------------------- .../distributedCache/DistributedCacheTest.java | 56 ++++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/06216334/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java index 19bcf76..21aa40a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java @@ -22,19 +22,30 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.Collector; + +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertTrue; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; -import java.io.*; +import static org.junit.Assert.assertTrue; -import java.util.*; +public class DistributedCacheTest extends AbstractTestBase { -public class DistributedCacheTest extends StreamingMultipleProgramsTestBase { public static final String data = "machen\n" + "zeit\n" @@ -42,6 +53,31 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase { + "keiner\n" + "meine\n"; + private static final int PARALLELISM = 4; + + private static LocalFlinkMiniCluster cluster; + + @BeforeClass + public static void setup() throws Exception { + cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true); + TestStreamEnvironment.setAsContext(cluster, PARALLELISM); + TestEnvironment.setAsContext(cluster, PARALLELISM); + } + + @AfterClass + public static void teardown() throws Exception { + TestStreamEnvironment.unsetAsContext(); + TestEnvironment.unsetAsContext(); + TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + } + + // ------------------------------------------------------------------------ + + public DistributedCacheTest() { + super(new Configuration()); + } + + // ------------------------------------------------------------------------ @Test public void testStreamingDistributedCache() throws Exception { @@ -68,12 +104,12 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase { @Override public void open(Configuration conf) throws IOException { File file = getRuntimeContext().getDistributedCache().getFile("cache_test"); - BufferedReader reader = new BufferedReader(new FileReader(file)); - String tempString; - while ((tempString = reader.readLine()) != null) { - wordList.add(tempString); + try (BufferedReader reader = new BufferedReader(new FileReader(file))) { + String tempString; + while ((tempString= reader.readLine()) != null) { + wordList.add(tempString); + } } - reader.close(); } @Override
