[FLINK-7909] Replace StreamingMultipleProgramsTestBase by AbstractTestBase The AbstractTestBase fully subsumes the functionality of the StreamingMultipleProgramsTestBase since it now is the most general test base for streaming and batch jobs. As a consequence, we can safely remove the StreamingMultipleProgramsTestBase and let all corresponding tests extend from AbstractTestBase.
This closes #4896. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b90210e3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b90210e3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b90210e3 Branch: refs/heads/master Commit: b90210e3712a54ad85a33dfc308a03e0c4a2a250 Parents: 3c5c832 Author: Till Rohrmann <[email protected]> Authored: Tue Oct 24 16:20:15 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue Jan 9 08:05:51 2018 +0100 ---------------------------------------------------------------------- .../ElasticsearchSinkTestBase.java | 8 ++- .../connectors/fs/RollingSinkITCase.java | 22 +++++-- .../connectors/fs/RollingSinkSecuredITCase.java | 62 +++++++++---------- .../mapred/HadoopIOFormatsITCase.java | 22 +++---- .../mapred/HadoopMapredITCase.java | 1 - .../mapreduce/HadoopInputOutputITCase.java | 1 - .../apache/flink/storm/split/SplitITCase.java | 4 +- .../examples/windowing/TopSpeedWindowing.java | 1 - .../streaming/test/StreamingExamplesITCase.java | 4 +- .../TopSpeedWindowingExampleITCase.java | 50 +++++++++------ .../socket/SocketWindowWordCountITCase.java | 4 +- .../examples/StreamingExamplesITCase.scala | 22 +++---- .../java/org/apache/flink/cep/CEPITCase.java | 4 +- .../table/runtime/stream/sql/JavaSqlITCase.java | 4 +- .../flink/table/api/stream/ExplainTest.scala | 4 +- .../UnsupportedOpsValidationTest.scala | 4 +- .../runtime/stream/TimeAttributesITCase.scala | 10 +-- .../runtime/stream/sql/TableSourceITCase.scala | 4 +- .../table/runtime/stream/table/CalcITCase.scala | 9 ++- .../runtime/stream/table/CorrelateITCase.scala | 4 +- .../stream/table/GroupWindowITCase.scala | 10 +-- .../stream/table/SetOperatorsITCase.scala | 4 +- .../runtime/stream/table/TableSinkITCase.scala | 7 +-- .../stream/table/TableSourceITCase.scala | 6 +- .../utils/StreamingWithStateTestBase.scala | 4 +- .../flink/streaming/api/DataStreamTest.java | 3 +- .../scala/api/CsvOutputFormatITCase.java | 4 +- .../scala/api/TextOutputFormatITCase.java | 4 +- .../streaming/api/scala/CoGroupJoinITCase.scala | 7 +-- .../streaming/api/scala/DataStreamTest.scala | 28 ++++++--- .../streaming/api/scala/SideOutputITCase.scala | 4 +- .../api/scala/TimeWindowTranslationTest.scala | 4 +- .../streaming/api/scala/WindowFoldITCase.scala | 6 +- .../api/scala/WindowReduceITCase.scala | 6 +- .../util/StreamingMultipleProgramsTestBase.java | 64 -------------------- .../flink/test/util/AbstractTestBase.java | 14 ++--- .../flink/test/util/MiniClusterResource.java | 11 ++-- .../test/util/MultipleProgramsTestBase.java | 48 ++++----------- .../apache/flink/test/util/TestBaseUtils.java | 2 +- .../CoStreamCheckpointingITCase.java | 4 +- .../StreamCheckpointNotifierITCase.java | 4 +- .../test/state/ManualWindowSpeedITCase.java | 4 +- .../streaming/api/StreamingOperatorsITCase.java | 4 +- .../runtime/ChainedRuntimeContextITCase.java | 4 +- .../streaming/runtime/CoGroupJoinITCase.java | 4 +- .../test/streaming/runtime/CoStreamITCase.java | 4 +- .../streaming/runtime/DataStreamPojoITCase.java | 4 +- .../streaming/runtime/DirectedOutputITCase.java | 4 +- .../test/streaming/runtime/IterateITCase.java | 26 ++++---- .../streaming/runtime/OutputSplitterITCase.java | 4 +- .../streaming/runtime/PartitionerITCase.java | 4 +- .../streaming/runtime/SelfConnectionITCase.java | 4 +- .../streaming/runtime/SideOutputITCase.java | 4 +- .../streaming/runtime/StateBackendITCase.java | 4 +- .../runtime/StreamTaskTimerITCase.java | 4 +- .../streaming/runtime/WindowFoldITCase.java | 4 +- .../sessionwindows/SessionWindowITCase.java | 4 +- 57 files changed, 253 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java index 297bc5d..b90e8ed 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.InstantiationUtil; import org.elasticsearch.client.Client; @@ -31,6 +31,8 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Collections; @@ -44,7 +46,9 @@ import static org.junit.Assert.fail; /** * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations. */ -public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgramsTestBase { +public abstract class ElasticsearchSinkTestBase extends AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkTestBase.class); protected static final String CLUSTER_NAME = "test-cluster"; http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index 10d1846..78f643f 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -32,9 +32,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TestLogger; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; @@ -83,13 +84,14 @@ import java.util.Map; * @deprecated should be removed with the {@link RollingSink}. */ @Deprecated -public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { +public class RollingSinkITCase extends TestLogger { protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class); @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + protected static MiniClusterResource miniClusterResource; protected static MiniDFSCluster hdfsCluster; protected static org.apache.hadoop.fs.FileSystem dfs; protected static String hdfsURI; @@ -98,7 +100,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { protected static File dataDir; @BeforeClass - public static void createHDFS() throws IOException { + public static void setup() throws Exception { LOG.info("In RollingSinkITCase: Starting MiniDFSCluster "); @@ -113,12 +115,22 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { hdfsURI = "hdfs://" + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) + "/"; + + miniClusterResource = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + new org.apache.flink.configuration.Configuration(), + 1, + 4)); } @AfterClass - public static void destroyHDFS() { + public static void teardown() throws Exception { LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster "); hdfsCluster.shutdown(); + + if (miniClusterResource != null) { + miniClusterResource.after(); + } } /** @@ -926,6 +938,8 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { } private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> { + private static final long serialVersionUID = 761584896826819477L; + private String key; private String expect; public StreamWriterWithConfigCheck(String key, String expect) { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java index 7595ac0..b76d087 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -25,7 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.modules.HadoopModule; -import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.SecureTestEnvironment; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.test.util.TestingSecurityContext; @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileWriter; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -94,19 +93,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase { * and out-of-order sequence for secure cluster */ @BeforeClass - public static void setup() throws Exception {} - - @AfterClass - public static void teardown() throws Exception {} - - @BeforeClass - public static void createHDFS() throws IOException {} - - @AfterClass - public static void destroyHDFS() {} - - @BeforeClass - public static void startSecureCluster() throws Exception { + public static void setup() throws Exception { skipIfHadoopVersionIsNotAppropriate(); @@ -158,20 +145,29 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase { + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) + "/"; - startSecureFlinkClusterWithRecoveryModeEnabled(); + Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled(); + + miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration( + configuration, + 1, + 4)); + + miniClusterResource.before(); } @AfterClass - public static void teardownSecureCluster() throws Exception { + public static void teardown() throws Exception { LOG.info("tearing down secure cluster environment"); - TestStreamEnvironment.unsetAsContext(); - stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); - if (hdfsCluster != null) { hdfsCluster.shutdown(); } + if (miniClusterResource != null) { + miniClusterResource.after(); + miniClusterResource = null; + } + SecureTestEnvironment.cleanup(); } @@ -208,30 +204,26 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase { conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003"); } - private static void startSecureFlinkClusterWithRecoveryModeEnabled() { + private static Configuration startSecureFlinkClusterWithRecoveryModeEnabled() { try { LOG.info("Starting Flink and ZK in secure mode"); dfs.mkdirs(new Path("/flink/checkpoints")); dfs.mkdirs(new Path("/flink/recovery")); - org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); - - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM); - config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false); - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); - config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - config.setString(CoreOptions.STATE_BACKEND, "filesystem"); - config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery"); - config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints"); + final Configuration result = new Configuration(); - SecureTestEnvironment.populateFlinkSecureConfigurations(config); + result.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false); + result.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); + result.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + result.setString(CoreOptions.STATE_BACKEND, "filesystem"); + result.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); + result.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery"); + result.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints"); - cluster = TestBaseUtils.startCluster(config, false); - TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); + SecureTestEnvironment.populateFlinkSecureConfigurations(result); + return result; } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java index 46102a2..753d813 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java @@ -23,9 +23,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.JavaProgramTestBase; -import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.OperatingSystem; import org.apache.hadoop.fs.FileSystem; @@ -44,11 +42,9 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; /** * Integration tests for Hadoop IO formats. @@ -58,14 +54,14 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase { private static final int NUM_PROGRAMS = 2; - private int curProgId = config.getInteger("ProgramId", -1); + private final int curProgId; private String[] resultPath; private String[] expectedResult; private String sequenceFileInPath; private String sequenceFileInPathNull; - public HadoopIOFormatsITCase(Configuration config) { - super(config); + public HadoopIOFormatsITCase(int curProgId) { + this.curProgId = curProgId; } @Before @@ -143,17 +139,15 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase { } @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + public static Collection<Object[]> getConfigurations() { - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + Collection<Object[]> programIds = new ArrayList<>(NUM_PROGRAMS); for (int i = 1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); + programIds.add(new Object[]{i}); } - return TestBaseUtils.toParameterList(tConfigs); + return programIds; } private static class HadoopIOFormatPrograms { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java index 145eaaa..db2ad8e 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java @@ -44,7 +44,6 @@ public class HadoopMapredITCase extends JavaProgramTestBase { protected void preSubmit() throws Exception { textPath = createTempFile("text.txt", WordCountData.TEXT); resultPath = getTempDirPath("result"); - this.setParallelism(4); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java index a23a50d..783a5a6 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java @@ -46,7 +46,6 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase { protected void preSubmit() throws Exception { textPath = createTempFile("text.txt", WordCountData.TEXT); resultPath = getTempDirPath("result"); - this.setParallelism(4); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java index d53493c..7152cf2 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java @@ -19,7 +19,7 @@ package org.apache.flink.storm.split; import org.apache.flink.storm.split.SpoutSplitExample.Enrich; import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.After; import org.junit.Assert; @@ -32,7 +32,7 @@ import java.io.IOException; /** * Tests for split examples. */ -public class SplitITCase extends StreamingMultipleProgramsTestBase { +public class SplitITCase extends AbstractTestBase { private String output; http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index 7543bab..ee06cd4 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -55,7 +55,6 @@ public class TopSpeedWindowing { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setGlobalJobParameters(params); - env.setParallelism(1); @SuppressWarnings({"rawtypes", "serial"}) DataStream<Tuple4<Integer, Integer, Double, Long>> carData; http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java index 4c47d59..cfe899e 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java @@ -29,8 +29,8 @@ import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonDa import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData; import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData; import org.apache.flink.streaming.test.examples.join.WindowJoinData; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.commons.io.FileUtils; import org.junit.Test; @@ -40,7 +40,7 @@ import java.io.File; /** * Integration test for streaming programs in Java examples. */ -public class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase { +public class StreamingExamplesITCase extends AbstractTestBase { @Test public void testIterateExample() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java index c2f3164..320dd5f 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java @@ -17,33 +17,47 @@ package org.apache.flink.streaming.test.examples.windowing; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing; import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory; /** * Tests for {@link TopSpeedWindowing}. */ -public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase { +public class TopSpeedWindowingExampleITCase extends TestLogger { - protected String textPath; - protected String resultPath; + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA); - resultPath = getTempDirPath("result"); - } + @ClassRule + public static MiniClusterResource miniClusterResource = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + new Configuration(), + 1, + 1)); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath); - } + @Test + public void testTopSpeedWindowingExampleITCase() throws Exception { + File inputFile = temporaryFolder.newFile(); + FileUtils.writeFileUtf8(inputFile, TopSpeedWindowingExampleData.CAR_DATA); + + final String resultPath = temporaryFolder.newFolder().toURI().toString(); - @Override - protected void testProgram() throws Exception { - TopSpeedWindowing.main(new String[]{ - "--input", textPath, - "--output", resultPath}); + TopSpeedWindowing.main(new String[] { + "--input", inputFile.getAbsolutePath(), + "--output", resultPath}); + + compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java index a09b22e..91ee9bf 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.test.socket; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.streaming.examples.socket.SocketWindowWordCount; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.Test; @@ -38,7 +38,7 @@ import static org.junit.Assert.fail; /** * Tests for {@link SocketWindowWordCount}. */ -public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBase { +public class SocketWindowWordCountITCase extends AbstractTestBase { @Test public void testJavaProgram() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala index 24d1444..7407294 100644 --- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala +++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala @@ -24,7 +24,6 @@ import org.apache.commons.io.FileUtils import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.examples.iteration.util.IterateExampleData import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData @@ -37,7 +36,6 @@ import org.apache.flink.streaming.scala.examples.twitter.TwitterExample import org.apache.flink.streaming.scala.examples.windowing.{SessionWindowing, WindowWordCount} import org.apache.flink.streaming.scala.examples.wordcount.WordCount import org.apache.flink.streaming.test.examples.join.WindowJoinData -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.test.testdata.WordCountData import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils} import org.junit.Test @@ -45,12 +43,12 @@ import org.junit.Test /** * Integration test for streaming programs in Scala examples. */ -class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase { +class StreamingExamplesITCase extends AbstractTestBase { @Test def testIterateExample(): Unit = { - val inputPath = AbstractTestBase.createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS) - val resultPath = AbstractTestBase.getTempDirPath("result") + val inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS) + val resultPath = getTempDirPath("result") // the example is inherently non-deterministic. The iteration timeout of 5000 ms // is frequently not enough to make the test run stable on CI infrastructure @@ -99,14 +97,14 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase { @Test def testIncrementalLearningSkeleton(): Unit = { - val resultPath = AbstractTestBase.getTempDirPath("result") + val resultPath = getTempDirPath("result") IncrementalLearningSkeleton.main(Array("--output", resultPath)) TestBaseUtils.compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath) } @Test def testTwitterExample(): Unit = { - val resultPath = AbstractTestBase.getTempDirPath("result") + val resultPath = getTempDirPath("result") TwitterExample.main(Array("--output", resultPath)) TestBaseUtils.compareResultsByLinesInMemory( TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, @@ -115,7 +113,7 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase { @Test def testSessionWindowing(): Unit = { - val resultPath = AbstractTestBase.getTempDirPath("result") + val resultPath = getTempDirPath("result") SessionWindowing.main(Array("--output", resultPath)) TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath) } @@ -124,8 +122,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase { def testWindowWordCount(): Unit = { val windowSize = "250" val slideSize = "150" - val textPath = AbstractTestBase.createTempFile("text.txt", WordCountData.TEXT) - val resultPath = AbstractTestBase.getTempDirPath("result") + val textPath = createTempFile("text.txt", WordCountData.TEXT) + val resultPath = getTempDirPath("result") WindowWordCount.main(Array( "--input", textPath, @@ -142,8 +140,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase { @Test def testWordCount(): Unit = { - val textPath = AbstractTestBase.createTempFile("text.txt", WordCountData.TEXT) - val resultPath = AbstractTestBase.getTempDirPath("result") + val textPath = createTempFile("text.txt", WordCountData.TEXT) + val resultPath = getTempDirPath("result") WordCount.main(Array( "--input", textPath, http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 81b83a3..4f2383a 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Either; import org.junit.After; @@ -48,7 +48,7 @@ import java.util.Map; * End to end tests of both CEP operators and {@link NFA}. */ @SuppressWarnings("serial") -public class CEPITCase extends StreamingMultipleProgramsTestBase { +public class CEPITCase extends AbstractTestBase { private String resultPath; private String expected; http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java index f3d0309..44f89cc 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java @@ -25,12 +25,12 @@ import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.runtime.utils.JavaStreamTestData; import org.apache.flink.table.runtime.utils.StreamITCase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; import org.junit.Test; @@ -41,7 +41,7 @@ import java.util.List; /** * Integration tests for streaming SQL. */ -public class JavaSqlITCase extends StreamingMultipleProgramsTestBase { +public class JavaSqlITCase extends AbstractTestBase { @Test public void testRowRegisterRowWithNames() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala index 820de08..741a3cb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala @@ -20,13 +20,13 @@ package org.apache.flink.table.api.stream import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ +import org.apache.flink.test.util.AbstractTestBase import org.junit.Assert.assertEquals import org.junit._ -class ExplainTest extends StreamingMultipleProgramsTestBase { +class ExplainTest extends AbstractTestBase { private val testFilePath = this.getClass.getResource("/").getFile http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala index c1ad08c..1de2b1e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala @@ -19,13 +19,13 @@ package org.apache.flink.table.api.stream.table.validation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableEnvironment, ValidationException} import org.apache.flink.table.runtime.utils.StreamTestData +import org.apache.flink.test.util.AbstractTestBase import org.junit.Test -class UnsupportedOpsValidationTest extends StreamingMultipleProgramsTestBase { +class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testSort(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index a301354..c553ee6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -18,8 +18,8 @@ package org.apache.flink.table.runtime.stream -import java.math.BigDecimal import java.lang.{Integer => JInt, Long => JLong} +import java.math.BigDecimal import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo @@ -28,15 +28,15 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1 import org.apache.flink.table.api.scala._ -import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types} import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} +import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} +import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1 import org.apache.flink.table.runtime.utils.StreamITCase import org.apache.flink.table.utils.{MemoryTableSinkUtil, TestTableSourceWithTime} +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test @@ -46,7 +46,7 @@ import scala.collection.mutable /** * Tests for access and materialization of time attributes. */ -class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { +class TimeAttributesITCase extends AbstractTestBase { val data = List( (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"), http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala index 30ada56..246ce2e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala @@ -20,17 +20,17 @@ package org.apache.flink.table.runtime.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase} +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test import scala.collection.mutable -class TableSourceITCase extends StreamingMultipleProgramsTestBase { +class TableSourceITCase extends AbstractTestBase { @Test def testCsvTableSource(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala index 46788f5..a20b626 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala @@ -20,20 +20,19 @@ package org.apache.flink.table.runtime.stream.table import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.Literal import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, SplitUDF} -import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} -import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, UserDefinedFunctionTestUtils} +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test import scala.collection.mutable -class CalcITCase extends StreamingMultipleProgramsTestBase { +class CalcITCase extends AbstractTestBase { @Test def testSimpleSelectAll(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala index 215526d..0f563e6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala @@ -21,19 +21,19 @@ import java.lang.{Boolean => JBoolean} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException} import org.apache.flink.table.expressions.utils.{Func18, RichFunc2} import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, _} import org.apache.flink.table.utils._ +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.{Before, Test} import scala.collection.mutable -class CorrelateITCase extends StreamingMultipleProgramsTestBase { +class CorrelateITCase extends AbstractTestBase { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala index 1eebeee..588cff1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala @@ -22,17 +22,17 @@ import java.math.BigDecimal import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ -import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.runtime.stream.table.GroupWindowITCase._ +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge} import org.apache.flink.table.runtime.utils.StreamITCase +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test @@ -43,7 +43,7 @@ import scala.collection.mutable * We only test some aggregations until better testing of constructed DataStream * programs is possible. */ -class GroupWindowITCase extends StreamingMultipleProgramsTestBase { +class GroupWindowITCase extends AbstractTestBase { private val queryConfig = new StreamQueryConfig() queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala index 5e15e14..479bce2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala @@ -20,18 +20,18 @@ package org.apache.flink.table.runtime.stream.table import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test import scala.collection.mutable -class SetOperatorsITCase extends StreamingMultipleProgramsTestBase { +class SetOperatorsITCase extends AbstractTestBase { @Test def testUnion(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index b44d8ef..f1badee 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -31,22 +31,21 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableEnvironment, TableException, Types} import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} import org.apache.flink.table.sinks._ import org.apache.flink.table.utils.MemoryTableSinkUtil -import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils} import org.apache.flink.types.Row import org.apache.flink.util.Collector import org.junit.Assert._ import org.junit.Test -import scala.collection.mutable import scala.collection.JavaConverters._ +import scala.collection.mutable -class TableSinkITCase extends StreamingMultipleProgramsTestBase { +class TableSinkITCase extends AbstractTestBase { @Test def testInsertIntoRegisteredTableSink(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala index c9ea30a..d1a88b7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala @@ -29,12 +29,12 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JExecEnv} import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types} import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types} import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase} import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.table.utils._ +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row import org.apache.flink.util.Collector import org.junit.Assert._ @@ -43,7 +43,7 @@ import org.junit.Test import scala.collection.JavaConverters._ import scala.collection.mutable -class TableSourceITCase extends StreamingMultipleProgramsTestBase { +class TableSourceITCase extends AbstractTestBase { @Test(expected = classOf[TableException]) def testInvalidDatastreamType(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala index 8c41f22..5cfab4a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala @@ -18,11 +18,11 @@ package org.apache.flink.table.runtime.utils import org.apache.flink.contrib.streaming.state.RocksDBStateBackend -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.test.util.AbstractTestBase import org.junit.Rule import org.junit.rules.TemporaryFolder -class StreamingWithStateTestBase extends StreamingMultipleProgramsTestBase { +class StreamingWithStateTestBase extends AbstractTestBase { val _tempFolder = new TemporaryFolder http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index b76ade7..6fb06d3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -70,6 +70,7 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; import org.hamcrest.core.StringStartsWith; import org.junit.Assert; @@ -90,7 +91,7 @@ import static org.junit.Assert.fail; * Tests for {@link DataStream}. */ @SuppressWarnings("serial") -public class DataStreamTest { +public class DataStreamTest extends TestLogger { /** * Tests union functionality. This ensures that self-unions and unions of streams http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java index fb7c765..2311092 100644 --- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java +++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.scala.api; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.After; import org.junit.Before; @@ -34,7 +34,7 @@ import static org.junit.Assert.fail; /** * IT cases for the {@link org.apache.flink.api.java.io.CsvOutputFormat}. */ -public class CsvOutputFormatITCase extends StreamingMultipleProgramsTestBase { +public class CsvOutputFormatITCase extends AbstractTestBase { protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java index 84b81e2..c2e450a 100644 --- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java +++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.scala.api; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.After; import org.junit.Before; @@ -34,7 +34,7 @@ import static org.junit.Assert.fail; /** * IT cases for the {@link org.apache.flink.api.java.io.TextOutputFormat}. */ -public class TextOutputFormatITCase extends StreamingMultipleProgramsTestBase { +public class TextOutputFormatITCase extends AbstractTestBase { protected String resultPath; http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala index fddbe00..5412e8e 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala @@ -27,14 +27,13 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase - -import org.junit.Test +import org.apache.flink.test.util.AbstractTestBase import org.junit.Assert._ +import org.junit.Test import scala.collection.mutable -class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { +class CoGroupJoinITCase extends AbstractTestBase { @Test def testCoGroup(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 60c609d..6158c8e 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala import java.lang import org.apache.flink.api.common.functions._ -import org.apache.flink.api.common.operators.ResourceSpec import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.functions.ProcessFunction @@ -32,12 +31,12 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger} import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.streaming.runtime.partitioner._ -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.util.Collector import org.junit.Assert._ import org.junit.Test -class DataStreamTest extends StreamingMultipleProgramsTestBase { +class DataStreamTest extends AbstractTestBase { @Test def testNaming(): Unit = { @@ -242,7 +241,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { */ @Test def testParallelism() { - val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10) + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + val parallelism = env.getParallelism val src = env.fromElements(new Tuple2[Long, Long](0L, 0L)) val map = src.map(x => (0L, 0L)) @@ -255,9 +255,12 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { val sink = map.addSink(x => {}) assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism) - assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) + assert(parallelism == env.getStreamGraph.getStreamNode(map.getId).getParallelism) assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism) - assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) + assert(parallelism == env + .getStreamGraph + .getStreamNode(sink.getTransformation.getId) + .getParallelism) try { src.setParallelism(3) @@ -268,18 +271,23 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { } } - env.setParallelism(7) + val newParallelism = parallelism - 1 + + env.setParallelism(newParallelism) // the parallelism does not change since some windowing code takes the parallelism from // input operations and that cannot change dynamically assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism) - assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) + assert(parallelism == env.getStreamGraph.getStreamNode(map.getId).getParallelism) assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism) - assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) + assert(parallelism == env + .getStreamGraph + .getStreamNode(sink.getTransformation.getId) + .getParallelism) val parallelSource = env.generateSequence(0, 0) parallelSource.print() - assert(7 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism) + assert(newParallelism == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism) parallelSource.setParallelism(3) assert(3 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism) http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala index 8e66171..e844928 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala @@ -27,8 +27,8 @@ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.test.streaming.runtime.util.TestListResultSink +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.util.Collector import org.junit.Assert._ import org.junit.Test @@ -36,7 +36,7 @@ import org.junit.Test /** * Integration test for streaming programs using side outputs. */ -class SideOutputITCase extends StreamingMultipleProgramsTestBase { +class SideOutputITCase extends AbstractTestBase { /** * Test ProcessFunction side output. http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala index 35a56d7..db0fb71 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.util.Collector import org.junit.Assert._ import org.junit.Test @@ -39,7 +39,7 @@ import org.junit.Test * These tests verify that the api calls on [[WindowedStream]] that use the "time" shortcut * instantiate the correct window operator. */ -class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { +class TimeWindowTranslationTest extends AbstractTestBase { /** * Verifies that calls to timeWindow() instantiate a regular http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala index dc38758..ef27685 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala @@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.junit.{Ignore, Test} +import org.apache.flink.test.util.AbstractTestBase import org.junit.Assert._ +import org.junit.Test import scala.collection.mutable @@ -41,7 +41,7 @@ import scala.collection.mutable * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions * work for windows, because FoldWindowFunction is OutputTypeConfigurable. */ -class WindowFoldITCase extends StreamingMultipleProgramsTestBase { +class WindowFoldITCase extends AbstractTestBase { @Test def testFoldWindow(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala index ee1dbfd..b2137f5 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala @@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.test.util.AbstractTestBase import org.junit.Assert._ -import org.junit.{Ignore, Test} +import org.junit.Test import scala.collection.mutable @@ -41,7 +41,7 @@ import scala.collection.mutable * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions * work for windows, because FoldWindowFunction is OutputTypeConfigurable. */ -class WindowReduceITCase extends StreamingMultipleProgramsTestBase { +class WindowReduceITCase extends AbstractTestBase { @Test def testReduceWindow(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java deleted file mode 100644 index aa6e618..0000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.test.util.AbstractTestBase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for streaming unit tests that run multiple tests and want to reuse the same - * Flink cluster. This saves a significant amount of time, since the startup and - * shutdown of the Flink clusters (including actor systems, etc) usually dominates - * the execution of the actual tests. - * - * <p>To write a unit test against this test base, simply extend it and add - * one or more regular test methods and retrieve the StreamExecutionEnvironment from - * the context: - * - * <pre> - * {@literal @}Test - * public void someTest() { - * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - * // test code - * env.execute(); - * } - * - * {@literal @}Test - * public void anotherTest() { - * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - * // test code - * env.execute(); - * } - * - * </pre> - */ -public class StreamingMultipleProgramsTestBase extends AbstractTestBase { - - // ------------------------------------------------------------------------ - // The mini cluster that is shared across tests - // ------------------------------------------------------------------------ - - protected static final int DEFAULT_PARALLELISM = 4; - - protected static final Logger LOG = LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class); - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index bbd250d..65b351d 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -23,8 +23,6 @@ import org.apache.flink.util.FileUtils; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -42,7 +40,7 @@ import java.io.IOException; * <pre> * {@literal @}Test * public void someTest() { - * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); * // test code * env.execute(); * } @@ -58,8 +56,6 @@ import java.io.IOException; */ public abstract class AbstractTestBase extends TestBaseUtils { - protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class); - private static final int DEFAULT_PARALLELISM = 4; @ClassRule @@ -77,17 +73,17 @@ public abstract class AbstractTestBase extends TestBaseUtils { // Temporary File Utilities // -------------------------------------------------------------------------------------------- - public static String getTempDirPath(String dirName) throws IOException { + public String getTempDirPath(String dirName) throws IOException { File f = createAndRegisterTempFile(dirName); return f.toURI().toString(); } - public static String getTempFilePath(String fileName) throws IOException { + public String getTempFilePath(String fileName) throws IOException { File f = createAndRegisterTempFile(fileName); return f.toURI().toString(); } - public static String createTempFile(String fileName, String contents) throws IOException { + public String createTempFile(String fileName, String contents) throws IOException { File f = createAndRegisterTempFile(fileName); if (!f.getParentFile().exists()) { f.getParentFile().mkdirs(); @@ -97,7 +93,7 @@ public abstract class AbstractTestBase extends TestBaseUtils { return f.toURI().toString(); } - public static File createAndRegisterTempFile(String fileName) throws IOException { + public File createAndRegisterTempFile(String fileName) throws IOException { return new File(TEMPORARY_FOLDER.newFolder(), fileName); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index c8872ac..20dbebb 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -19,6 +19,7 @@ package org.apache.flink.test.util; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -60,11 +61,13 @@ public class MiniClusterResource extends ExternalResource { @Override public void before() throws Exception { + final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration()); + + configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers()); + configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.numberSlotsPerTaskManager); + localFlinkMiniCluster = TestBaseUtils.startCluster( - miniClusterResourceConfiguration.getNumberTaskManagers(), - miniClusterResourceConfiguration.getNumberSlotsPerTaskManager(), - false, - false, + configuration, true); numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers(); http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java index 06792ea..10039e6 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java @@ -18,12 +18,8 @@ package org.apache.flink.test.util; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; - import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.runners.Parameterized; import java.util.Arrays; @@ -57,7 +53,7 @@ import java.util.Collection; * * }</pre> */ -public class MultipleProgramsTestBase extends TestBaseUtils { +public class MultipleProgramsTestBase extends AbstractTestBase { /** * Enum that defines which execution environment to run the next test on: @@ -70,16 +66,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils { } // ------------------------------------------------------------------------ - // The mini cluster that is shared across tests - // ------------------------------------------------------------------------ - - protected static final int DEFAULT_PARALLELISM = 4; - - protected static boolean startWebServer = false; - - protected static LocalFlinkMiniCluster cluster = null; - - // ------------------------------------------------------------------------ protected final TestExecutionMode mode; @@ -93,12 +79,21 @@ public class MultipleProgramsTestBase extends TestBaseUtils { @Before public void setupEnvironment() { + TestEnvironment testEnvironment; switch(mode){ case CLUSTER: - new TestEnvironment(cluster, 4, false).setAsContext(); + // This only works because of the quirks we built in the TestEnvironment. + // We should refactor this in the future!!! + testEnvironment = miniClusterResource.getTestEnvironment(); + testEnvironment.getConfig().disableObjectReuse(); + testEnvironment.setAsContext(); break; case CLUSTER_OBJECT_REUSE: - new TestEnvironment(cluster, 4, true).setAsContext(); + // This only works because of the quirks we built in the TestEnvironment. + // We should refactor this in the future!!! + testEnvironment = miniClusterResource.getTestEnvironment(); + testEnvironment.getConfig().enableObjectReuse(); + testEnvironment.setAsContext(); break; case COLLECTION: new CollectionTestEnvironment().setAsContext(); @@ -120,25 +115,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils { } // ------------------------------------------------------------------------ - // Cluster setup & teardown - // ------------------------------------------------------------------------ - - @BeforeClass - public static void setup() throws Exception { - cluster = TestBaseUtils.startCluster( - 1, - DEFAULT_PARALLELISM, - startWebServer, - false, - true); - } - - @AfterClass - public static void teardown() throws Exception { - stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); - } - - // ------------------------------------------------------------------------ // Parametrization lets the tests run in cluster and collection mode // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index d2237ad..5a96326 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -459,7 +459,7 @@ public class TestBaseUtils extends TestLogger { throw new IllegalArgumentException("This path does not denote a local file."); } } catch (URISyntaxException | NullPointerException e) { - throw new IllegalArgumentException("This path does not describe a valid local file URI."); + throw new IllegalArgumentException("This path does not describe a valid local file URI.", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java index d224905..b207de8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.Collector; @@ -57,7 +57,7 @@ import static org.junit.Assert.assertTrue; * state reflects the "exactly once" semantics. */ @SuppressWarnings({"serial", "deprecation"}) -public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBase { +public class CoStreamCheckpointingITCase extends AbstractTestBase { private static final long NUM_STRINGS = 10_000L; private static final int PARALLELISM = 4; http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index 16d8b54..7b058a0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.Collector; import org.junit.Test; @@ -69,7 +69,7 @@ import static org.junit.Assert.fail; * successfully completed checkpoint. */ @SuppressWarnings("serial") -public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTestBase { +public class StreamCheckpointNotifierITCase extends AbstractTestBase { private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);
