[FLINK-8703][tests] Migrate tests to MiniClusterResource (batch #2) This closes #5542.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e056b34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e056b34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e056b34 Branch: refs/heads/master Commit: 3e056b34f7be817e0d0eee612b6ae44891e33501 Parents: 0ae7364 Author: zentol <[email protected]> Authored: Tue Feb 20 18:02:32 2018 +0100 Committer: zentol <[email protected]> Committed: Mon Feb 26 20:42:00 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/ml/util/FlinkTestBase.scala | 25 ++-- ...ScalaStreamingMultipleProgramsTestBase.scala | 29 ++--- ...tractEventTimeWindowCheckpointingITCase.java | 86 ++++++------- .../AbstractLocalRecoveryITCase.java | 100 ++++++++++++++++ ...ckendEventTimeWindowCheckpointingITCase.java | 5 +- ...ckendEventTimeWindowCheckpointingITCase.java | 6 +- ...ckendEventTimeWindowCheckpointingITCase.java | 6 +- ...ckendEventTimeWindowCheckpointingITCase.java | 5 +- ...ckendEventTimeWindowCheckpointingITCase.java | 5 +- .../checkpointing/LocalRecoveryHeapITCase.java | 33 +++++ .../test/checkpointing/LocalRecoveryITCase.java | 120 ------------------- .../LocalRecoveryRocksDBFullITCase.java | 33 +++++ .../LocalRecoveryRocksDBIncrementalITCase.java | 33 +++++ ...ckendEventTimeWindowCheckpointingITCase.java | 5 +- ...ckendEventTimeWindowCheckpointingITCase.java | 5 +- .../operators/CustomDistributionITCase.java | 42 ++----- .../flink/test/runtime/IPv6HostnamesITCase.java | 47 ++++---- 17 files changed, 311 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala index c27a2b5..2165152 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala @@ -18,8 +18,9 @@ package org.apache.flink.ml.util -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster -import org.apache.flink.test.util.{TestBaseUtils, TestEnvironment} +import org.apache.flink.configuration.Configuration +import org.apache.flink.test.util.MiniClusterResource +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration import org.scalatest.{BeforeAndAfter, Suite} /** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests. @@ -51,27 +52,21 @@ import org.scalatest.{BeforeAndAfter, Suite} trait FlinkTestBase extends BeforeAndAfter { that: Suite => - var cluster: Option[LocalFlinkMiniCluster] = None + var cluster: Option[MiniClusterResource] = None val parallelism = 4 before { - val cl = TestBaseUtils.startCluster( - 1, - parallelism, - false, - false, - true) - - val clusterEnvironment = new TestEnvironment(cl, parallelism, false) - clusterEnvironment.setAsContext() + val cl = new MiniClusterResource( + new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism) + ) + + cl.before() cluster = Some(cl) } after { - cluster.foreach(c => TestBaseUtils.stopCluster(c, TestBaseUtils.DEFAULT_TIMEOUT)) - - TestEnvironment.unsetAsContext() + cluster.foreach(c => c.after()) } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala index d9f727c..e0c5b45 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala @@ -18,12 +18,10 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster -import org.apache.flink.streaming.util.TestStreamEnvironment -import org.apache.flink.test.util.TestBaseUtils - +import org.apache.flink.configuration.Configuration +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration +import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils} import org.junit.{After, Before} - import org.scalatest.junit.JUnitSuiteLike trait ScalaStreamingMultipleProgramsTestBase @@ -31,28 +29,21 @@ trait ScalaStreamingMultipleProgramsTestBase with JUnitSuiteLike { val parallelism = 4 - var cluster: Option[LocalFlinkMiniCluster] = None + var cluster: Option[MiniClusterResource] = None @Before def beforeAll(): Unit = { - val cluster = Some( - TestBaseUtils.startCluster( - 1, - parallelism, - false, - false, - true - ) + val cl = new MiniClusterResource( + new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism) ) - TestStreamEnvironment.setAsContext(cluster.get, parallelism) + cl.before() + + cluster = Some(cl) } @After def afterAll(): Unit = { - TestStreamEnvironment.unsetAsContext() - cluster.foreach { - TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT) - } + cluster.foreach { c => c.after() } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 557c097..f37ba0d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -26,35 +26,32 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.apache.curator.test.TestingServer; import org.junit.After; -import org.junit.Before; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -65,9 +62,6 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; @@ -91,31 +85,42 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024; private static final int PARALLELISM = 4; - private static LocalFlinkMiniCluster cluster; + private TestingServer zkServer; - private static TestStreamEnvironment env; - - private static TestingServer zkServer; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); @Rule public TestName name = new TestName(); - private StateBackendEnum stateBackendEnum; - protected AbstractStateBackend stateBackend; + private AbstractStateBackend stateBackend; - AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) { - this.stateBackendEnum = stateBackendEnum; - } + @Rule + public final MiniClusterResource miniClusterResource = getMiniClusterResource(); enum StateBackendEnum { MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC } - @Before - public void startTestCluster() throws Exception { + protected abstract StateBackendEnum getStateBackend(); + + protected final MiniClusterResource getMiniClusterResource() { + return new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfigurationSafe(), + 2, + PARALLELISM / 2)); + } + + private Configuration getConfigurationSafe() { + try { + return getConfiguration(); + } catch (Exception e) { + throw new AssertionError("Could not initialize test.", e); + } + } + + private Configuration getConfiguration() throws Exception { // print a message when starting a test method to avoid Travis' <tt>"Maven produced no // output for xxx seconds."</tt> messages @@ -123,6 +128,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + "."); // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints + StateBackendEnum stateBackendEnum = getStateBackend(); if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) { zkServer = new TestingServer(); zkServer.start(); @@ -130,23 +136,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog Configuration config = createClusterConfig(); - // purposefully delay in the executor to tease out races - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); - HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( - config, - new Executor() { - @Override - public void execute(Runnable command) { - executor.schedule(command, 500, MILLISECONDS); - } - }); - - cluster = new LocalFlinkMiniCluster(config, haServices, false); - cluster.start(); - - env = new TestStreamEnvironment(cluster, PARALLELISM); - env.getConfig().setUseSnapshotCompression(true); - switch (stateBackendEnum) { case MEM: this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false); @@ -190,6 +179,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog default: throw new IllegalStateException("No backend selected."); } + return config; } protected Configuration createClusterConfig() throws IOException { @@ -198,8 +188,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog final File haDir = temporaryFolder.newFolder(); Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L); // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB @@ -215,11 +203,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog @After public void stopTestCluster() throws IOException { - if (cluster != null) { - cluster.stop(); - cluster = null; - } - if (zkServer != null) { zkServer.stop(); zkServer = null; @@ -241,12 +224,14 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog FailingSource.reset(); try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); + env.getConfig().setUseSnapshotCompression(true); env .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) @@ -310,6 +295,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog FailingSource.reset(); try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setMaxParallelism(maxParallelism); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -317,6 +303,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); + env.getConfig().setUseSnapshotCompression(true); env .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) @@ -376,6 +363,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog FailingSource.reset(); try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(2 * PARALLELISM); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -438,12 +426,14 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog FailingSource.reset(); try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); + env.getConfig().setUseSnapshotCompression(true); env .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) @@ -507,12 +497,14 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog FailingSource.reset(); try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); + env.getConfig().setUseSnapshotCompression(true); env .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java new file mode 100644 index 0000000..a02e902 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; + +import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum; + +/** + * This test delegates to instances of {@link AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured + * to use local recovery. + * + * <p>TODO: This class must be refactored to properly extend {@link AbstractEventTimeWindowCheckpointingITCase}. + */ +public abstract class AbstractLocalRecoveryITCase extends TestLogger { + + private final StateBackendEnum backendEnum; + private final LocalRecoveryMode recoveryMode; + + @Rule + public TestName testName = new TestName(); + + AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, LocalRecoveryMode recoveryMode) { + this.backendEnum = backendEnum; + this.recoveryMode = recoveryMode; + } + + @Test + public final void executeTest() throws Exception { + AbstractEventTimeWindowCheckpointingITCase.tempFolder.create(); + AbstractEventTimeWindowCheckpointingITCase windowChkITCase = + new AbstractEventTimeWindowCheckpointingITCase() { + @Override + protected StateBackendEnum getStateBackend() { + return backendEnum; + } + + @Override + protected Configuration createClusterConfig() throws IOException { + Configuration config = super.createClusterConfig(); + + config.setString( + CheckpointingOptions.LOCAL_RECOVERY, + recoveryMode.toString()); + + return config; + } + }; + + executeTest(windowChkITCase); + } + + private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception { + delegate.name = testName; + try { + delegate.miniClusterResource.before(); + try { + delegate.testTumblingTimeWindow(); + delegate.miniClusterResource.after(); + } catch (Exception e) { + delegate.miniClusterResource.after(); + } + + delegate.miniClusterResource.before(); + try { + delegate.testSlidingTimeWindow(); + delegate.miniClusterResource.after(); + } catch (Exception e) { + delegate.miniClusterResource.after(); + } + } finally { + delegate.tempFolder.delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java index f0db4d5..c4b06d4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java @@ -23,7 +23,8 @@ package org.apache.flink.test.checkpointing; */ public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public AsyncFileBackendEventTimeWindowCheckpointingITCase() { - super(StateBackendEnum.FILE_ASYNC); + @Override + protected StateBackendEnum getStateBackend() { + return StateBackendEnum.FILE_ASYNC; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java index 70ec757..2cc5b01 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java @@ -22,8 +22,8 @@ package org.apache.flink.test.checkpointing; * Integration tests for asynchronous memory backend. */ public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - - public AsyncMemBackendEventTimeWindowCheckpointingITCase() { - super(StateBackendEnum.MEM_ASYNC); + @Override + protected StateBackendEnum getStateBackend() { + return StateBackendEnum.MEM_ASYNC; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java index 030c1a3..eab6153 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java @@ -22,8 +22,8 @@ package org.apache.flink.test.checkpointing; * Integration tests for file backend. */ public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - - public FileBackendEventTimeWindowCheckpointingITCase() { - super(StateBackendEnum.FILE); + @Override + protected StateBackendEnum getStateBackend() { + return StateBackendEnum.FILE; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java index 394815f..ed43ad6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java @@ -23,8 +23,9 @@ package org.apache.flink.test.checkpointing; */ public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() { - super(StateBackendEnum.ROCKSDB_INCREMENTAL_ZK); + @Override + protected StateBackendEnum getStateBackend() { + return StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java index dfb66cc..1276a00 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java @@ -23,8 +23,9 @@ package org.apache.flink.test.checkpointing; */ public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() { - super(StateBackendEnum.ROCKSDB_INCREMENTAL); + @Override + protected StateBackendEnum getStateBackend() { + return StateBackendEnum.ROCKSDB_INCREMENTAL; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java new file mode 100644 index 0000000..2c0c294 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC; + +/** + * Tests file-based local recovery with the HeapBackend. + */ +public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase { + public LocalRecoveryHeapITCase() { + super( + FILE_ASYNC, + ENABLE_FILE_BASED); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java deleted file mode 100644 index 51b3b84..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.checkpointing; - -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.TestLogger; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import java.io.IOException; - -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; -import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum; -import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC; -import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC; -import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; - -/** - * This test delegates to instances of {@link AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured - * to use local recovery. - */ -public class LocalRecoveryITCase extends TestLogger { - - @Rule - public TestName testName = new TestName(); - - @Test - public void testLocalRecoveryHeapBackendFileBased() throws Exception { - executeTest( - FILE_ASYNC, - ENABLE_FILE_BASED); - } - - @Test - public void testLocalRecoveryRocksIncrementalFileBased() throws Exception { - executeTest( - ROCKSDB_INCREMENTAL_ZK, - ENABLE_FILE_BASED); - } - - @Test - public void testLocalRecoveryRocksFullFileBased() throws Exception { - executeTest( - ROCKSDB_FULLY_ASYNC, - ENABLE_FILE_BASED); - } - - private void executeTest( - StateBackendEnum backendEnum, - LocalRecoveryMode recoveryMode) throws Exception { - - AbstractEventTimeWindowCheckpointingITCase windowChkITCase = - new AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery( - backendEnum, - recoveryMode); - - executeTest(windowChkITCase); - } - - private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception { - delegate.name = testName; - delegate.tempFolder.create(); - try { - delegate.startTestCluster(); - delegate.testTumblingTimeWindow(); - delegate.stopTestCluster(); - - delegate.startTestCluster(); - delegate.testSlidingTimeWindow(); - delegate.stopTestCluster(); - } finally { - delegate.tempFolder.delete(); - } - } - - private static class AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery - extends AbstractEventTimeWindowCheckpointingITCase { - - private final LocalRecoveryMode recoveryMode; - - AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery( - StateBackendEnum stateBackendEnum, - LocalRecoveryMode recoveryMode) { - - super(stateBackendEnum); - this.recoveryMode = recoveryMode; - } - - @Override - protected Configuration createClusterConfig() throws IOException { - Configuration config = super.createClusterConfig(); - - config.setString( - CheckpointingOptions.LOCAL_RECOVERY, - recoveryMode.toString()); - - return config; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java new file mode 100644 index 0000000..16bbbfc --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC; + +/** + * Tests file-based local recovery with the RocksDB state-backend. + */ +public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase { + public LocalRecoveryRocksDBFullITCase() { + super( + ROCKSDB_FULLY_ASYNC, + ENABLE_FILE_BASED); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java new file mode 100644 index 0000000..fa8e139 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; + +/** + * Tests file-based local recovery with the RocksDB state-backend and incremental checkpointing enabled. + */ +public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase { + public LocalRecoveryRocksDBIncrementalITCase() { + super( + ROCKSDB_INCREMENTAL_ZK, + ENABLE_FILE_BASED); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java index 54a29ed..e153b4b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java @@ -23,7 +23,8 @@ package org.apache.flink.test.checkpointing; */ public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public MemBackendEventTimeWindowCheckpointingITCase() { - super(StateBackendEnum.MEM); + @Override + protected StateBackendEnum getStateBackend() { + return StateBackendEnum.MEM; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java index 3873aff..e6d5b9e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java @@ -23,8 +23,9 @@ package org.apache.flink.test.checkpointing; */ public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public RocksDbBackendEventTimeWindowCheckpointingITCase() { - super(StateBackendEnum.ROCKSDB_FULLY_ASYNC); + @Override + protected StateBackendEnum getStateBackend() { + return StateBackendEnum.ROCKSDB_FULLY_ASYNC; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java index 2452475..74b8cf7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java @@ -27,19 +27,15 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.operators.util.CollectionDataSets; -import org.apache.flink.test.util.TestBaseUtils; -import org.apache.flink.test.util.TestEnvironment; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import java.io.IOException; @@ -52,32 +48,12 @@ import static org.junit.Assert.fail; @SuppressWarnings("serial") public class CustomDistributionITCase extends TestLogger { - // ------------------------------------------------------------------------ - // The mini cluster that is shared across tests - // ------------------------------------------------------------------------ - - private static LocalFlinkMiniCluster cluster; - - @BeforeClass - public static void setup() throws Exception { - cluster = TestBaseUtils.startCluster(1, 8, false, false, true); - } - - @AfterClass - public static void teardown() throws Exception { - TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); - } - - @Before - public void prepare() { - TestEnvironment clusterEnv = new TestEnvironment(cluster, 1, false); - clusterEnv.setAsContext(); - } - - @After - public void cleanup() { - TestEnvironment.unsetAsContext(); - } + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + new Configuration(), + 1, + 8)); // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java index f24d21e..4a7b345 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java @@ -27,14 +27,16 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; +import org.junit.AssumptionViolatedException; +import org.junit.Rule; import org.junit.Test; import java.io.IOException; @@ -56,31 +58,33 @@ import static org.junit.Assert.fail; @SuppressWarnings("serial") public class IPv6HostnamesITCase extends TestLogger { - @Test - public void testClusterWithIPv6host() { + @Rule + public final MiniClusterResource miniClusterResource = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + 2, + 2)); + private Configuration getConfiguration() { final Inet6Address ipv6address = getLocalIPv6Address(); if (ipv6address == null) { - System.err.println("--- Cannot find a non-loopback local IPv6 address that Akka/Netty can bind to; skipping IPv6HostnamesITCase"); - return; + throw new AssumptionViolatedException("--- Cannot find a non-loopback local IPv6 address that Akka/Netty can bind to; skipping IPv6HostnamesITCase"); } + final String addressString = ipv6address.getHostAddress(); + log.info("Test will use IPv6 address " + addressString + " for connection tests"); + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, addressString); + config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); + return config; + } - LocalFlinkMiniCluster flink = null; + @Test + public void testClusterWithIPv6host() { try { - final String addressString = ipv6address.getHostAddress(); - log.info("Test will use IPv6 address " + addressString + " for connection tests"); - Configuration conf = new Configuration(); - conf.setString(JobManagerOptions.ADDRESS, addressString); - conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString); - conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - conf.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); - - flink = new LocalFlinkMiniCluster(conf, false); - flink.start(); - - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort()); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.getConfig().disableSysoutLogging(); @@ -108,11 +112,6 @@ public class IPv6HostnamesITCase extends TestLogger { e.printStackTrace(); fail(e.getMessage()); } - finally { - if (flink != null) { - flink.stop(); - } - } } private Inet6Address getLocalIPv6Address() {
