[hotfix] [tests] Share proper test mini cluster for tests in DistributedCacheTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e10cdaa1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e10cdaa1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e10cdaa1 Branch: refs/heads/master Commit: e10cdaa11c514d017d9cf92f88184f9e29d51fa9 Parents: 35c0871 Author: Stephan Ewen <[email protected]> Authored: Wed May 10 20:57:08 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu May 11 14:11:07 2017 +0200 ---------------------------------------------------------------------- .../distributedCache/DistributedCacheTest.java | 56 ++++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e10cdaa1/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java index 19bcf76..21aa40a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java @@ -22,19 +22,30 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.Collector; + +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertTrue; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; -import java.io.*; +import static org.junit.Assert.assertTrue; -import java.util.*; +public class DistributedCacheTest extends AbstractTestBase { -public class DistributedCacheTest extends StreamingMultipleProgramsTestBase { public static final String data = "machen\n" + "zeit\n" @@ -42,6 +53,31 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase { + "keiner\n" + "meine\n"; + private static final int PARALLELISM = 4; + + private static LocalFlinkMiniCluster cluster; + + @BeforeClass + public static void setup() throws Exception { + cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true); + TestStreamEnvironment.setAsContext(cluster, PARALLELISM); + TestEnvironment.setAsContext(cluster, PARALLELISM); + } + + @AfterClass + public static void teardown() throws Exception { + TestStreamEnvironment.unsetAsContext(); + TestEnvironment.unsetAsContext(); + TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + } + + // ------------------------------------------------------------------------ + + public DistributedCacheTest() { + super(new Configuration()); + } + + // ------------------------------------------------------------------------ @Test public void testStreamingDistributedCache() throws Exception { @@ -68,12 +104,12 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase { @Override public void open(Configuration conf) throws IOException { File file = getRuntimeContext().getDistributedCache().getFile("cache_test"); - BufferedReader reader = new BufferedReader(new FileReader(file)); - String tempString; - while ((tempString = reader.readLine()) != null) { - wordList.add(tempString); + try (BufferedReader reader = new BufferedReader(new FileReader(file))) { + String tempString; + while ((tempString= reader.readLine()) != null) { + wordList.add(tempString); + } } - reader.close(); } @Override
