http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java index 934a795..6abea2a 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java @@ -27,11 +27,11 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +92,7 @@ public class ManualExactlyOnceWithStreamReshardingTest { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false); + LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false); flink.start(); final int flinkPort = flink.getLeaderRPCPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala index ee415d1..29b3a3e 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala @@ -18,8 +18,9 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.streaming.util.TestStreamEnvironment -import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} +import org.apache.flink.test.util.TestBaseUtils import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitSuiteLike @@ -29,7 +30,7 @@ trait ScalaStreamingMultipleProgramsTestBase with BeforeAndAfterAll { val parallelism = 4 - var cluster: Option[ForkableFlinkMiniCluster] = None + var cluster: Option[LocalFlinkMiniCluster] = None override protected def beforeAll(): Unit = { val cluster = Some( http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/pom.xml ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml index 2ab52b5..18ecfde 100644 --- a/flink-test-utils-parent/flink-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-test-utils/pom.xml @@ -79,153 +79,4 @@ under the License. </dependency> </dependencies> - - <build> - <plugins> - <!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.1.4</version> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies - on scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - - <!-- Run scala compiler in the process-test-resources phase, so that - dependencies on scala classes can be resolved later in the (Java) test-compile - phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - <!-- Add src/test/scala to eclipse build path --> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Scala Code Style, most of the configuration done via plugin management --> - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <configuration> - <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> - </configuration> - </plugin> - - </plugins> - <pluginManagement> - <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId> - net.alchim31.maven - </groupId> - <artifactId> - scala-maven-plugin - </artifactId> - <versionRange> - [3.1.4,) - </versionRange> - <goals> - <goal>compile</goal> - <goal>testCompile</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - </build> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java index c5fbaf0..a478908 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.util; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestBaseUtils; import org.junit.AfterClass; @@ -61,7 +61,7 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase { protected static final int DEFAULT_PARALLELISM = 4; - protected static ForkableFlinkMiniCluster cluster; + protected static LocalFlinkMiniCluster cluster; public StreamingMultipleProgramsTestBase() { super(new Configuration()); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index c700102..64c68dc 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -20,10 +20,10 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.Preconditions; /** @@ -32,10 +32,10 @@ import org.apache.flink.util.Preconditions; public class TestStreamEnvironment extends StreamExecutionEnvironment { /** The mini cluster in which this environment executes its jobs */ - private ForkableFlinkMiniCluster executor; + private LocalFlinkMiniCluster executor; - public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) { + public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) { this.executor = Preconditions.checkNotNull(executor); setParallelism(parallelism); } @@ -57,7 +57,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { * @param cluster The test cluster to run the test program on. * @param parallelism The default parallelism for the test programs. */ - public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) { + public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) { StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index c2da691..316fd21 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -21,6 +21,7 @@ package org.apache.flink.test.util; import com.google.common.base.Charsets; import com.google.common.io.Files; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import scala.concurrent.duration.FiniteDuration; import java.io.File; @@ -48,7 +49,7 @@ public abstract class AbstractTestBase extends TestBaseUtils { protected int numTaskManagers = 1; /** The mini cluster that runs the test programs */ - protected ForkableFlinkMiniCluster executor; + protected LocalFlinkMiniCluster executor; public AbstractTestBase(Configuration config) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java index d7f09bd..4e83245 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.util; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runners.Parameterized; @@ -72,7 +73,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils { protected static boolean startWebServer = false; - protected static ForkableFlinkMiniCluster cluster = null; + protected static LocalFlinkMiniCluster cluster = null; // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 4014b80..b774f97 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -32,7 +32,8 @@ import org.apache.commons.io.IOUtils; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; +import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.TestLogger; import org.apache.hadoop.fs.FileSystem; @@ -104,7 +105,7 @@ public class TestBaseUtils extends TestLogger { } - public static ForkableFlinkMiniCluster startCluster( + public static LocalFlinkMiniCluster startCluster( int numTaskManagers, int taskManagerNumSlots, boolean startWebserver, @@ -126,7 +127,7 @@ public class TestBaseUtils extends TestLogger { return startCluster(config, singleActorSystem); } - public static ForkableFlinkMiniCluster startCluster( + public static LocalFlinkMiniCluster startCluster( Configuration config, boolean singleActorSystem) throws Exception { @@ -147,7 +148,7 @@ public class TestBaseUtils extends TestLogger { config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString()); - ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, singleActorSystem); + LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, singleActorSystem); cluster.start(); @@ -155,7 +156,7 @@ public class TestBaseUtils extends TestLogger { } - public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception { + public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception { if (logDir != null) { FileUtils.deleteDirectory(logDir); } @@ -169,11 +170,15 @@ public class TestBaseUtils extends TestLogger { List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<>(); for (ActorRef tm : tms) { - bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages - .RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout))); - - numActiveConnectionsResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages - .RequestNumActiveConnections$.MODULE$, new Timeout(timeout))); + bcVariableManagerResponseFutures.add(Patterns.ask( + tm, + TaskManagerMessages.getRequestBroadcastVariablesWithReferences(), + new Timeout(timeout))); + + numActiveConnectionsResponseFutures.add(Patterns.ask( + tm, + TaskManagerMessages.getRequestNumActiveConnections(), + new Timeout(timeout))); } Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence( @@ -182,8 +187,7 @@ public class TestBaseUtils extends TestLogger { Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout); for (Object response : responses) { - numUnreleasedBCVars += ((TestingTaskManagerMessages - .ResponseBroadcastVariablesWithReferences) response).number(); + numUnreleasedBCVars += ((TaskManagerMessages.ResponseBroadcastVariablesWithReferences) response).number(); } Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence( @@ -192,8 +196,7 @@ public class TestBaseUtils extends TestLogger { responses = Await.result(numActiveConnectionsFutureResponses, timeout); for (Object response : responses) { - numActiveConnections += ((TestingTaskManagerMessages - .ResponseNumActiveConnections) response).number(); + numActiveConnections += ((TaskManagerMessages.ResponseNumActiveConnections) response).number(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java index 7cb88be..aea8152 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java @@ -29,10 +29,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; public class TestEnvironment extends ExecutionEnvironment { - private final ForkableFlinkMiniCluster executor; + private final LocalFlinkMiniCluster executor; private TestEnvironment lastEnv = null; @@ -46,7 +47,7 @@ public class TestEnvironment extends ExecutionEnvironment { } } - public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) { + public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism) { this.executor = executor; setParallelism(parallelism); @@ -54,7 +55,7 @@ public class TestEnvironment extends ExecutionEnvironment { getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE); } - public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) { + public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) { this(executor, parallelism); if (isObjectReuseEnabled) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala deleted file mode 100644 index fa3135a..0000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.util - -import java.util.concurrent.TimeoutException - -import akka.actor.{ActorRef, ActorSystem} -import akka.pattern.Patterns._ -import akka.pattern.ask - -import org.apache.curator.test.TestingCluster -import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.clusterframework.FlinkResourceManager -import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode} -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster -import org.apache.flink.runtime.taskmanager.TaskManager -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager -import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager} -import org.apache.flink.runtime.testutils.TestingResourceManager - -import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ - -/** - * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution - * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and - * uses it to avoid port conflicts. - * - * @param userConfiguration Configuration object with the user provided configuration values - * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the - * same [[ActorSystem]], otherwise false. - */ -class ForkableFlinkMiniCluster( - userConfiguration: Configuration, - singleActorSystem: Boolean) - extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) { - - def this(userConfiguration: Configuration) = this(userConfiguration, true) - - // -------------------------------------------------------------------------- - - var zookeeperCluster: Option[TestingCluster] = None - - override def generateConfiguration(userConfiguration: Configuration): Configuration = { - val forkNumberString = System.getProperty("forkNumber") - - val forkNumber = try { - Integer.parseInt(forkNumberString) - } - catch { - case e: NumberFormatException => -1 - } - - val config = userConfiguration.clone() - - if (forkNumber != -1) { - val jobManagerRPC = 1024 + forkNumber*400 - val taskManagerRPC = 1024 + forkNumber*400 + 100 - val taskManagerData = 1024 + forkNumber*400 + 200 - val resourceManagerRPC = 1024 + forkNumber*400 + 300 - - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC) - config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC) - config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData) - config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerRPC) - } - - super.generateConfiguration(config) - } - - override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = { - val config = configuration.clone() - - val jobManagerName = getJobManagerName(index) - val archiveName = getArchiveName(index) - - val jobManagerPort = config.getInteger( - ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - - if (jobManagerPort > 0) { - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index) - } - - val (jobManager, _) = JobManager.startJobManagerActors( - config, - actorSystem, - Some(jobManagerName), - Some(archiveName), - classOf[TestingJobManager], - classOf[TestingMemoryArchivist]) - - jobManager - } - - override def startResourceManager(index: Int, system: ActorSystem): ActorRef = { - val config = configuration.clone() - - val resourceManagerName = getResourceManagerName(index) - - val resourceManagerPort = config.getInteger( - ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT) - - if (resourceManagerPort > 0) { - config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index) - } - - val resourceManager = FlinkResourceManager.startResourceManagerActors( - config, - system, - createLeaderRetrievalService(), - classOf[TestingResourceManager], - resourceManagerName) - - resourceManager - } - - override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { - val config = configuration.clone() - - val rpcPort = config.getInteger( - ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) - - val dataPort = config.getInteger( - ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) - - if (rpcPort > 0) { - config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index) - } - if (dataPort > 0) { - config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index) - } - - val localExecution = numTaskManagers == 1 - - TaskManager.startTaskManagerComponentsAndActor( - config, - ResourceID.generate(), - system, - hostname, - Some(TaskManager.TASK_MANAGER_NAME + index), - Some(createLeaderRetrievalService()), - localExecution, - classOf[TestingTaskManager]) - } - - def addTaskManager(): Unit = { - if (useSingleActorSystem) { - (jobManagerActorSystems, taskManagerActors) match { - case (Some(jmSystems), Some(tmActors)) => - val index = numTaskManagers - taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0))) - numTaskManagers += 1 - case _ => throw new IllegalStateException("Cluster has not been started properly.") - } - } else { - (taskManagerActorSystems, taskManagerActors) match { - case (Some(tmSystems), Some(tmActors)) => - val index = numTaskManagers - val newTmSystem = startTaskManagerActorSystem(index) - val newTmActor = startTaskManager(index, newTmSystem) - - taskManagerActorSystems = Some(tmSystems :+ newTmSystem) - taskManagerActors = Some(tmActors :+ newTmActor) - - numTaskManagers += 1 - case _ => throw new IllegalStateException("Cluster has not been started properly.") - } - } - } - - def restartLeadingJobManager(): Unit = { - this.synchronized { - (jobManagerActorSystems, jobManagerActors) match { - case (Some(jmActorSystems), Some(jmActors)) => - val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration)) - val index = getLeaderIndex(AkkaUtils.getTimeout(configuration)) - - clearLeader() - - val stopped = gracefulStop(leader.actor(), ForkableFlinkMiniCluster.MAX_RESTART_DURATION) - Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION) - - if(!singleActorSystem) { - jmActorSystems(index).shutdown() - jmActorSystems(index).awaitTermination() - } - - val newJobManagerActorSystem = if(!singleActorSystem) { - startJobManagerActorSystem(index) - } else { - jmActorSystems.head - } - - val newJobManagerActor = startJobManager(index, newJobManagerActorSystem) - - jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1)) - jobManagerActorSystems = Some(jmActorSystems.patch( - index, - Seq(newJobManagerActorSystem), - 1)) - - val lrs = createLeaderRetrievalService() - - jobManagerLeaderRetrievalService = Some(lrs) - lrs.start(this) - - case _ => throw new Exception("The JobManager of the ForkableFlinkMiniCluster have not " + - "been started properly.") - } - } - } - - - def restartTaskManager(index: Int): Unit = { - (taskManagerActorSystems, taskManagerActors) match { - case (Some(tmActorSystems), Some(tmActors)) => - val stopped = gracefulStop(tmActors(index), ForkableFlinkMiniCluster.MAX_RESTART_DURATION) - Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION) - - if(!singleActorSystem) { - tmActorSystems(index).shutdown() - tmActorSystems(index).awaitTermination() - } - - val taskManagerActorSystem = if(!singleActorSystem) { - startTaskManagerActorSystem(index) - } else { - tmActorSystems.head - } - - val taskManagerActor = startTaskManager(index, taskManagerActorSystem) - - taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1)) - taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1)) - - case _ => throw new Exception("The TaskManager of the ForkableFlinkMiniCluster have not " + - "been started properly.") - } - } - - override def start(): Unit = { - val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "") - - zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER && - zookeeperURL.equals("")) { - LOG.info("Starting ZooKeeper cluster.") - - val testingCluster = new TestingCluster(1) - - configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, - testingCluster.getConnectString) - - testingCluster.start() - - Some(testingCluster) - } else { - None - } - - super.start() - } - - override def stop(): Unit = { - super.stop() - - zookeeperCluster.foreach{ - LOG.info("Stopping ZooKeeper cluster.") - _.close() - } - } - - def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = { - val futures = taskManagerActors.map { - _.map { - tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout) - } - }.getOrElse(Seq()) - - try { - Await.ready(Future.sequence(futures), timeout) - } catch { - case t: TimeoutException => - throw new Exception("Timeout while waiting for TaskManagers to register at " + - s"${jobManager.path}") - } - - } -} - -object ForkableFlinkMiniCluster { - - val MAX_RESTART_DURATION = 2 minute - - val DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT = "200 s" - - def startCluster( - numSlots: Int, - numTaskManagers: Int, - timeout: String = DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT) - : ForkableFlinkMiniCluster = { - - val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers) - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) - - val cluster = new ForkableFlinkMiniCluster(config) - - cluster.start() - - cluster - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java index cac8451..cc70fee 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -44,7 +44,7 @@ import static org.junit.Assert.fail; */ public class AccumulatorErrorITCase { - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass public static void startCluster() { @@ -53,7 +53,7 @@ public class AccumulatorErrorITCase { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 49e18e0..624bfff 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -234,7 +234,6 @@ public class AccumulatorLiveITCase { fail("Wrong accumulator results when map task begins execution."); } - int expectedAccVal = 0; /* for mapper task */ http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index 9671fce..8a08f15 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -22,8 +22,8 @@ package org.apache.flink.test.cancelling; import java.util.concurrent.TimeUnit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import static org.apache.flink.runtime.taskmanager.TaskCancelTest.awaitRunning; import static org.apache.flink.runtime.taskmanager.TaskCancelTest.cancelJob; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; -import org.apache.flink.util.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.junit.After; @@ -65,7 +64,7 @@ public abstract class CancelingTestBase extends TestLogger { // -------------------------------------------------------------------------------------------- - protected ForkableFlinkMiniCluster executor; + protected LocalFlinkMiniCluster executor; protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS; @@ -88,7 +87,7 @@ public abstract class CancelingTestBase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096); config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048); - this.executor = new ForkableFlinkMiniCluster(config, false); + this.executor = new LocalFlinkMiniCluster(config, false); this.executor.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 163fb42..94ff66f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; @@ -35,7 +36,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -60,7 +60,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { private static final int PARALLELISM = 4; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass @@ -71,7 +71,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s"); config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s"); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index fa5339d..0aee128 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -41,7 +42,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -76,7 +76,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024; private static final int PARALLELISM = 4; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @@ -95,7 +95,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 8915bff..7f1d7f3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -73,7 +73,7 @@ public class RescalingITCase extends TestLogger { private static int slotsPerTaskManager = 2; private static int numSlots = numTaskManagers * slotsPerTaskManager; - private static ForkableFlinkMiniCluster cluster; + private static TestingCluster cluster; @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -92,7 +92,7 @@ public class RescalingITCase extends TestLogger { config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); - cluster = new ForkableFlinkMiniCluster(config); + cluster = new TestingCluster(config); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 550ba75..7409fe7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; -import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CheckpointListener; @@ -51,6 +50,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint; @@ -62,8 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; -import org.apache.flink.testutils.junit.RetryOnFailure; import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.TestLogger; import org.junit.Rule; @@ -76,7 +74,6 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.File; -import java.io.FileNotFoundException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -137,7 +134,7 @@ public class SavepointITCase extends TestLogger { LOG.info("Created temporary directory: " + tmpDir + "."); - ForkableFlinkMiniCluster flink = null; + TestingCluster flink = null; try { // Create a test actor system @@ -168,7 +165,7 @@ public class SavepointITCase extends TestLogger { LOG.info("Flink configuration: " + config + "."); // Start Flink - flink = new ForkableFlinkMiniCluster(config); + flink = new TestingCluster(config); LOG.info("Starting Flink cluster."); flink.start(); @@ -261,7 +258,7 @@ public class SavepointITCase extends TestLogger { LOG.info("JobManager: " + jobManager + "."); final Throwable[] error = new Throwable[1]; - final ForkableFlinkMiniCluster finalFlink = flink; + final TestingCluster finalFlink = flink; final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create(); new JavaTestKit(testActorSystem) {{ @@ -422,7 +419,7 @@ public class SavepointITCase extends TestLogger { LOG.info("Created temporary directory: " + tmpDir + "."); - ForkableFlinkMiniCluster flink = null; + TestingCluster flink = null; List<File> checkpointFiles = new ArrayList<>(); try { @@ -447,7 +444,7 @@ public class SavepointITCase extends TestLogger { LOG.info("Flink configuration: " + config + "."); // Start Flink - flink = new ForkableFlinkMiniCluster(config); + flink = new TestingCluster(config); LOG.info("Starting Flink cluster."); flink.start(); @@ -559,7 +556,7 @@ public class SavepointITCase extends TestLogger { // Test deadline final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); - ForkableFlinkMiniCluster flink = null; + TestingCluster flink = null; try { // Flink configuration @@ -570,7 +567,7 @@ public class SavepointITCase extends TestLogger { LOG.info("Flink configuration: " + config + "."); // Start Flink - flink = new ForkableFlinkMiniCluster(config); + flink = new TestingCluster(config); LOG.info("Starting Flink cluster."); flink.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index cf15052..6bf511f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -80,7 +80,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger { private static final int NUM_TASK_SLOTS = 3; private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass public static void startCluster() { @@ -91,7 +91,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger { config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms"); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java index 67c05e5..5f6cd4a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java @@ -20,8 +20,8 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.TestLogger; @@ -43,7 +43,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger { protected static final int NUM_TASK_SLOTS = 4; protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass public static void startCluster() { @@ -53,7 +53,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index 2e6ce78..e424a8d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; @@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -71,7 +71,7 @@ public class WindowCheckpointingITCase extends TestLogger { private static final int PARALLELISM = 4; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass @@ -81,7 +81,7 @@ public class WindowCheckpointingITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 8b56d3d..7afafe4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.classloading; -import akka.pattern.AskTimeoutException; import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; @@ -37,9 +36,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; import org.apache.flink.test.testdata.KMeansData; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -82,7 +81,7 @@ public class ClassLoaderITCase extends TestLogger { public static final TemporaryFolder FOLDER = new TemporaryFolder(); - private static ForkableFlinkMiniCluster testCluster; + private static TestingCluster testCluster; private static int parallelism; @@ -105,7 +104,7 @@ public class ClassLoaderITCase extends TestLogger { config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, FOLDER.newFolder().getAbsoluteFile().toURI().toString()); - testCluster = new ForkableFlinkMiniCluster(config, false); + testCluster = new TestingCluster(config, false); testCluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java index c9059f1..a74ed34 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java @@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.minicluster.FlinkMiniCluster; +import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -42,7 +42,6 @@ import java.util.concurrent.Semaphore; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; - /** * Tests retrieval of a job from a running Flink cluster */ @@ -54,7 +53,7 @@ public class JobRetrievalITCase extends TestLogger { @BeforeClass public static void before() { - cluster = new ForkableFlinkMiniCluster(new Configuration(), false); + cluster = new TestingCluster(new Configuration(), false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java index 28c2e58..178656d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java @@ -26,8 +26,8 @@ import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -47,7 +47,7 @@ public class JobSubmissionFailsITCase { private static final int NUM_SLOTS = 20; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; private static JobGraph workingJobGraph; @BeforeClass @@ -58,7 +58,7 @@ public class JobSubmissionFailsITCase { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2); - cluster = new ForkableFlinkMiniCluster(config); + cluster = new LocalFlinkMiniCluster(config); cluster.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java index ca2c156..133ebd0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.Collector; @@ -52,7 +52,7 @@ public class CustomDistributionITCase extends TestLogger { // The mini cluster that is shared across tests // ------------------------------------------------------------------------ - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass public static void setup() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java index 34a7eed..e18e82a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java @@ -23,11 +23,10 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; -import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.Assert; @@ -54,7 +53,7 @@ public class RemoteEnvironmentITCase { private static final String VALID_STARTUP_TIMEOUT = "100 s"; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass public static void setupCluster() { @@ -62,7 +61,7 @@ public class RemoteEnvironmentITCase { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index 09b5e7e..a67e6ef 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -49,14 +49,14 @@ public class AutoParallelismITCase { private static final int SLOTS_PER_TM = 7; private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass public static void setupCluster() { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java index f30f61f..51f3534 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java @@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.types.Value; import org.junit.AfterClass; @@ -43,7 +43,7 @@ public class CustomSerializationITCase { private static final int PARLLELISM = 5; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass public static void startCluster() { @@ -51,7 +51,7 @@ public class CustomSerializationITCase { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java index 42419fb..06b93ea 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java @@ -29,7 +29,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.Collector; @@ -52,7 +52,7 @@ import static org.junit.Assert.*; @SuppressWarnings("serial") public class MiscellaneousIssuesITCase { - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass public static void startCluster() { @@ -61,7 +61,7 @@ public class MiscellaneousIssuesITCase { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index 12b7a68..a43bab6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -32,8 +32,8 @@ import org.apache.flink.examples.java.clustering.KMeans; import org.apache.flink.examples.java.clustering.util.KMeansData; import org.apache.flink.examples.java.graph.ConnectedComponents; import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.Test; import static org.junit.Assert.*; @@ -43,7 +43,7 @@ public class SuccessAfterNetworkBuffersFailureITCase { @Test public void testSuccessfulProgramAfterFailure() { - ForkableFlinkMiniCluster cluster = null; + LocalFlinkMiniCluster cluster = null; try { Configuration config = new Configuration(); @@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index 40732df..b99858a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -55,6 +55,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks; @@ -62,7 +63,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.QueryableStateStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -108,7 +108,7 @@ public class QueryableStateITCase extends TestLogger { * Shared between all the test. Make sure to have at least NUM_SLOTS * available after your test finishes, e.g. cancel the job you submitted. */ - private static ForkableFlinkMiniCluster cluster; + private static TestingCluster cluster; @BeforeClass public static void setup() { @@ -120,7 +120,7 @@ public class QueryableStateITCase extends TestLogger { config.setInteger(ConfigConstants.QUERYABLE_STATE_CLIENT_NETWORK_THREADS, 1); config.setInteger(ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, 1); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new TestingCluster(config, false); cluster.start(true); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java index 8a45d62..8a43ee4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java @@ -24,11 +24,11 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -49,7 +49,7 @@ public class FastFailuresITCase extends TestLogger { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false); + LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java index 0c5d14b..a0d6b58 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java @@ -20,7 +20,7 @@ package org.apache.flink.test.recovery; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.BeforeClass; public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase { @@ -34,8 +34,8 @@ public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCas config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, "1 second"); config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms"); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java index 6355a8f..f09efc5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java @@ -20,7 +20,7 @@ package org.apache.flink.test.recovery; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.BeforeClass; public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase { @@ -33,8 +33,8 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecover config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms"); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java index 004340c..bf7c524 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.AfterClass; import org.junit.Test; @@ -42,7 +42,7 @@ import static org.junit.Assert.*; @SuppressWarnings("serial") public abstract class SimpleRecoveryITCaseBase { - protected static ForkableFlinkMiniCluster cluster; + protected static LocalFlinkMiniCluster cluster; @AfterClass public static void teardownCluster() { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java index 6c621ac..5d29905 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java @@ -30,7 +30,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.messages.TaskManagerMessages; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Future; @@ -65,7 +65,7 @@ public class TaskManagerFailureRecoveryITCase { final int PARALLELISM = 4; - ForkableFlinkMiniCluster cluster = null; + LocalFlinkMiniCluster cluster = null; ActorSystem additionalSystem = null; try { @@ -78,7 +78,7 @@ public class TaskManagerFailureRecoveryITCase { config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s"); config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java index 7710f06..0b008eb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java @@ -26,8 +26,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.Collector; @@ -63,7 +63,7 @@ public class IPv6HostnamesITCase extends TestLogger { - ForkableFlinkMiniCluster flink = null; + LocalFlinkMiniCluster flink = null; try { final String addressString = ipv6address.getHostAddress(); log.info("Test will use IPv6 address " + addressString + " for connection tests"); @@ -75,7 +75,7 @@ public class IPv6HostnamesITCase extends TestLogger { conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); - flink = new ForkableFlinkMiniCluster(conf, false); + flink = new LocalFlinkMiniCluster(conf, false); flink.start(); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort());
