http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala new file mode 100644 index 0000000..8a348a1 --- /dev/null +++ b/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util + +import java.util.concurrent.TimeoutException + +import akka.actor.{ActorRef, ActorSystem} +import akka.pattern.Patterns._ +import akka.pattern.ask +import org.apache.curator.test.TestingCluster +import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.clusterframework.FlinkResourceManager +import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode} +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster +import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager +import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager, TestingUtils} + +import org.apache.flink.runtime.testutils.TestingResourceManager + +import scala.concurrent.{Await, Future} + +/** + * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution + * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and + * uses it to avoid port conflicts. + * + * @param userConfiguration Configuration object with the user provided configuration values + * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the + * same [[ActorSystem]], otherwise false. + */ +class ForkableFlinkMiniCluster( + userConfiguration: Configuration, + singleActorSystem: Boolean) + extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) { + + def this(userConfiguration: Configuration) = this(userConfiguration, true) + + // -------------------------------------------------------------------------- + + var zookeeperCluster: Option[TestingCluster] = None + + override def generateConfiguration(userConfiguration: Configuration): Configuration = { + val forkNumberString = System.getProperty("forkNumber") + + val forkNumber = try { + Integer.parseInt(forkNumberString) + } + catch { + case e: NumberFormatException => -1 + } + + val config = userConfiguration.clone() + + if (forkNumber != -1) { + val jobManagerRPC = 1024 + forkNumber*400 + val taskManagerRPC = 1024 + forkNumber*400 + 100 + val taskManagerData = 1024 + forkNumber*400 + 200 + val resourceManagerRPC = 1024 + forkNumber*400 + 300 + + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC) + config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC) + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData) + config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerRPC) + } + + super.generateConfiguration(config) + } + + override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = { + val config = configuration.clone() + + val jobManagerName = getJobManagerName(index) + val archiveName = getArchiveName(index) + + val jobManagerPort = config.getInteger( + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + + if (jobManagerPort > 0) { + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index) + } + + val (jobManager, _) = JobManager.startJobManagerActors( + config, + actorSystem, + Some(jobManagerName), + Some(archiveName), + classOf[TestingJobManager], + classOf[TestingMemoryArchivist]) + + jobManager + } + + override def startResourceManager(index: Int, system: ActorSystem): ActorRef = { + val config = configuration.clone() + + val resourceManagerName = getResourceManagerName(index) + + val resourceManagerPort = config.getInteger( + ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT) + + if (resourceManagerPort > 0) { + config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index) + } + + val resourceManager = FlinkResourceManager.startResourceManagerActors( + config, + system, + createLeaderRetrievalService(), + classOf[TestingResourceManager], + resourceManagerName) + + resourceManager + } + + override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { + val config = configuration.clone() + + val rpcPort = config.getInteger( + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) + + val dataPort = config.getInteger( + ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) + + if (rpcPort > 0) { + config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index) + } + if (dataPort > 0) { + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index) + } + + val localExecution = numTaskManagers == 1 + + TaskManager.startTaskManagerComponentsAndActor( + config, + ResourceID.generate(), + system, + hostname, + Some(TaskManager.TASK_MANAGER_NAME + index), + Some(createLeaderRetrievalService()), + localExecution, + classOf[TestingTaskManager]) + } + + def restartLeadingJobManager(): Unit = { + this.synchronized { + (jobManagerActorSystems, jobManagerActors) match { + case (Some(jmActorSystems), Some(jmActors)) => + val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration)) + val index = getLeaderIndex(AkkaUtils.getTimeout(configuration)) + + clearLeader() + + val stopped = gracefulStop(leader.actor(), TestingUtils.TESTING_DURATION) + Await.result(stopped, TestingUtils.TESTING_DURATION) + + if(!singleActorSystem) { + jmActorSystems(index).shutdown() + jmActorSystems(index).awaitTermination() + } + + val newJobManagerActorSystem = if(!singleActorSystem) { + startJobManagerActorSystem(index) + } else { + jmActorSystems.head + } + + val newJobManagerActor = startJobManager(index, newJobManagerActorSystem) + + jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1)) + jobManagerActorSystems = Some(jmActorSystems.patch( + index, + Seq(newJobManagerActorSystem), + 1)) + + val lrs = createLeaderRetrievalService() + + jobManagerLeaderRetrievalService = Some(lrs) + lrs.start(this) + + case _ => throw new Exception("The JobManager of the ForkableFlinkMiniCluster have not " + + "been started properly.") + } + } + } + + + def restartTaskManager(index: Int): Unit = { + (taskManagerActorSystems, taskManagerActors) match { + case (Some(tmActorSystems), Some(tmActors)) => + val stopped = gracefulStop(tmActors(index), TestingUtils.TESTING_DURATION) + Await.result(stopped, TestingUtils.TESTING_DURATION) + + if(!singleActorSystem) { + tmActorSystems(index).shutdown() + tmActorSystems(index).awaitTermination() + } + + val taskManagerActorSystem = if(!singleActorSystem) { + startTaskManagerActorSystem(index) + } else { + tmActorSystems.head + } + + val taskManagerActor = startTaskManager(index, taskManagerActorSystem) + + taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1)) + taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1)) + + case _ => throw new Exception("The TaskManager of the ForkableFlinkMiniCluster have not " + + "been started properly.") + } + } + + override def start(): Unit = { + val zookeeperURL = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "") + + zookeeperCluster = if(recoveryMode == RecoveryMode.ZOOKEEPER && zookeeperURL.equals("")) { + LOG.info("Starting ZooKeeper cluster.") + + val testingCluster = new TestingCluster(1) + + configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString) + + testingCluster.start() + + Some(testingCluster) + } else { + None + } + + super.start() + } + + override def stop(): Unit = { + super.stop() + + zookeeperCluster.foreach{ + LOG.info("Stopping ZooKeeper cluster.") + _.close() + } + } + + def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = { + val futures = taskManagerActors.map { + _.map { + tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout) + } + }.getOrElse(Seq()) + + try { + Await.ready(Future.sequence(futures), timeout) + } catch { + case t: TimeoutException => + throw new Exception("Timeout while waiting for TaskManagers to register at " + + s"${jobManager.path}") + } + + } +} + +object ForkableFlinkMiniCluster { + + import org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT + + def startCluster( + numSlots: Int, + numTaskManagers: Int, + timeout: String = DEFAULT_AKKA_ASK_TIMEOUT) + : ForkableFlinkMiniCluster = { + + val config = new Configuration() + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers) + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) + + val cluster = new ForkableFlinkMiniCluster(config) + + cluster.start() + + cluster + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index ee7d0a3..0459039 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -105,6 +105,7 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_2.10</artifactId> <version>${project.version}</version> + <type>test-jar</type> <scope>test</scope> </dependency> @@ -539,6 +540,26 @@ under the License. <ignore/> </action> </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId> + net.alchim31.maven + </groupId> + <artifactId> + scala-maven-plugin + </artifactId> + <versionRange> + [3.1.4,) + </versionRange> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> </pluginExecutions> </lifecycleMappingMetadata> </configuration> http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 4431106..6faee45 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -69,7 +69,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); - + config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s"); + config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s"); cluster = new ForkableFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index 1d0951d..255eeee 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -38,17 +38,12 @@ under the License. <packaging>jar</packaging> <dependencies> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_2.10</artifactId> + <artifactId>flink-test-utils_2.10</artifactId> <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> </dependency> <!-- Needed for the streaming wordcount example --> @@ -66,12 +61,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>${shading-artifact.name}</artifactId> <version>${project.version}</version> </dependency> @@ -87,20 +76,7 @@ under the License. </exclusion> </exclusions> </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.11</version> - <type>jar</type> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - + <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-testkit_${scala.binary.version}</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java deleted file mode 100644 index 30116af..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; - -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.FlinkYarnSessionCli; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; -import org.apache.flink.test.util.TestBaseUtils; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -public class FlinkYarnSessionCliTest { - - @Rule - public TemporaryFolder tmp = new TemporaryFolder(); - - @Test - public void testDynamicProperties() throws IOException { - - Map<String, String> map = new HashMap<String, String>(System.getenv()); - File tmpFolder = tmp.newFolder(); - File fakeConf = new File(tmpFolder, "flink-conf.yaml"); - fakeConf.createNewFile(); - map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath()); - TestBaseUtils.setEnv(map); - Options options = new Options(); - FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); - cli.getYARNSessionCLIOptions(options); - - CommandLineParser parser = new PosixParser(); - CommandLine cmd = null; - try { - cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"}); - } catch(Exception e) { - e.printStackTrace(); - Assert.fail("Parsing failed with " + e.getMessage()); - } - - AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd); - - Assert.assertNotNull(flinkYarnClient); - - Map<String, String> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded()); - Assert.assertEquals(1, dynProperties.size()); - Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java deleted file mode 100644 index b0757f5..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist; -import org.apache.flink.runtime.testutils.TestingResourceManager; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.SignalHandler; - -/** - * Yarn application master which starts the {@link TestingYarnJobManager}, - * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}. - */ -public class TestingApplicationMaster extends YarnApplicationMasterRunner { - - @Override - public Class<? extends JobManager> getJobManagerClass() { - return TestingYarnJobManager.class; - } - - @Override - public Class<? extends MemoryArchivist> getArchivistClass() { - return TestingMemoryArchivist.class; - } - - @Override - protected Class<? extends TaskManager> getTaskManagerClass() { - return TestingYarnTaskManager.class; - } - - @Override - public Class<? extends YarnFlinkResourceManager> getResourceManagerClass() { - return TestingYarnFlinkResourceManager.class; - } - - public static void main(String[] args) { - EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args); - SignalHandler.register(LOG); - - // run and exit with the proper return code - int returnCode = new TestingApplicationMaster().run(args); - System.exit(returnCode); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java deleted file mode 100644 index 1efc336..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import com.google.common.base.Preconditions; - -import java.io.File; -import java.io.FilenameFilter; -import java.util.ArrayList; -import java.util.List; - -/** - * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the - * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which - * are shipped to the yarn cluster. This is necessary to load the testing classes. - */ -public class TestingFlinkYarnClient extends FlinkYarnClientBase { - - public TestingFlinkYarnClient() { - List<File> filesToShip = new ArrayList<>(); - - File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests")); - Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " + - "Make sure to package the flink-yarn-tests module."); - - File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime")); - Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " + - "jar. Make sure to package the flink-runtime module."); - - filesToShip.add(testingJar); - filesToShip.add(testingRuntimeJar); - - setShipFiles(filesToShip); - } - - @Override - protected Class<?> getApplicationMasterClass() { - return TestingApplicationMaster.class; - } - - public static class TestJarFinder implements FilenameFilter { - - private final String jarName; - - public TestJarFinder(final String jarName) { - this.jarName = jarName; - } - - @Override - public boolean accept(File dir, String name) { - return name.startsWith(jarName) && name.endsWith("-tests.jar") && - dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java deleted file mode 100644 index 5a61b8f..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.conf.YarnConfiguration; - -/** - * Flink's testing resource manager for Yarn. - */ -public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager { - - public TestingYarnFlinkResourceManager( - Configuration flinkConfig, - YarnConfiguration yarnConfig, - LeaderRetrievalService leaderRetrievalService, - String applicationMasterHostName, - String webInterfaceURL, - ContaineredTaskManagerParameters taskManagerParameters, - ContainerLaunchContext taskManagerLaunchContext, - int yarnHeartbeatIntervalMillis, - int maxFailedContainers, - int numInitialTaskManagers) { - - super( - flinkConfig, - yarnConfig, - leaderRetrievalService, - applicationMasterHostName, - webInterfaceURL, - taskManagerParameters, - taskManagerLaunchContext, - yarnHeartbeatIntervalMillis, - maxFailedContainers, - numInitialTaskManagers); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java deleted file mode 100644 index 8586a77..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import java.io.IOException; - -/** - * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}. - */ -public class TestingYarnTaskManagerRunner { - public static void main(String[] args) throws IOException { - YarnTaskManagerRunner.runYarnTaskManager(args, TestingYarnTaskManager.class); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java deleted file mode 100644 index 784bf24..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.yarn; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.Level; -import org.apache.log4j.spi.LoggingEvent; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public class UtilsTest { - private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class); - - @Test - public void testUberjarLocator() { - File dir = YarnTestBase.findFile("..", new YarnTestBase.RootDirFilenameFilter()); - Assert.assertNotNull(dir); - Assert.assertTrue(dir.getName().endsWith(".jar")); - dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root - Assert.assertTrue(dir.exists()); - Assert.assertTrue(dir.isDirectory()); - List<String> files = Arrays.asList(dir.list()); - Assert.assertTrue(files.contains("lib")); - Assert.assertTrue(files.contains("bin")); - Assert.assertTrue(files.contains("conf")); - } - - /** - * Remove 15% of the heap, at least 384MB. - * - */ - @Test - public void testHeapCutoff() { - Configuration conf = new Configuration(); - conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15); - conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384); - - Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) ); - Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) ); - - // test different configuration - Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf)); - - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000"); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1"); - Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf)); - - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5"); - Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf)); - - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1"); - Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); - - // test also deprecated keys - conf = new Configuration(); - conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15); - conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384); - - Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) ); - Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) ); - } - - @Test(expected = IllegalArgumentException.class) - public void illegalArgument() { - Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1"); - Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); - } - - @Test(expected = IllegalArgumentException.class) - public void illegalArgumentNegative() { - Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01"); - Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); - } - - @Test(expected = IllegalArgumentException.class) - public void tooMuchCutoff() { - Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000"); - Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); - } - - @Test - public void testGetEnvironmentVariables() { - Configuration testConf = new Configuration(); - testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native"); - - Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf); - - Assert.assertEquals(1, res.size()); - Map.Entry<String, String> entry = res.entrySet().iterator().next(); - Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey()); - Assert.assertEquals("/usr/lib/native", entry.getValue()); - } - - @Test - public void testGetEnvironmentVariablesErroneous() { - Configuration testConf = new Configuration(); - testConf.setString("yarn.application-master.env.", "/usr/lib/native"); - - Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf); - - Assert.assertEquals(0, res.size()); - } - - // - // --------------- Tools to test if a certain string has been logged with Log4j. ------------- - // See : http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j - // - private static TestAppender testAppender; - public static void addTestAppender(Class target, Level level) { - testAppender = new TestAppender(); - testAppender.setThreshold(level); - org.apache.log4j.Logger lg = org.apache.log4j.Logger.getLogger(target); - lg.setLevel(level); - lg.addAppender(testAppender); - //org.apache.log4j.Logger.getRootLogger().addAppender(testAppender); - } - - public static void checkForLogString(String expected) { - LoggingEvent found = getEventContainingString(expected); - if(found != null) { - LOG.info("Found expected string '"+expected+"' in log message "+found); - return; - } - Assert.fail("Unable to find expected string '" + expected + "' in log messages"); - } - - public static LoggingEvent getEventContainingString(String expected) { - if(testAppender == null) { - throw new NullPointerException("Initialize test appender first"); - } - LoggingEvent found = null; - // make sure that different threads are not logging while the logs are checked - synchronized (testAppender.events) { - for (LoggingEvent event : testAppender.events) { - if (event.getMessage().toString().contains(expected)) { - found = event; - break; - } - } - } - return found; - } - - public static class TestAppender extends AppenderSkeleton { - public final List<LoggingEvent> events = new ArrayList<>(); - public void close() {} - public boolean requiresLayout() {return false;} - @Override - protected void append(LoggingEvent event) { - synchronized (events){ - events.add(event); - } - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java deleted file mode 100644 index a93abf0..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.testkit.JavaTestKit; -import org.apache.curator.test.TestingServer; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.messages.Messages; -import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import scala.concurrent.duration.FiniteDuration; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -public class YARNHighAvailabilityITCase extends YarnTestBase { - - private static TestingServer zkServer; - - private static ActorSystem actorSystem; - - private static final int numberApplicationAttempts = 10; - - @Rule - public TemporaryFolder tmp = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - actorSystem = AkkaUtils.createDefaultActorSystem(); - - try { - zkServer = new TestingServer(); - zkServer.start(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Could not start ZooKeeper testing cluster."); - } - - yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha"); - yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts); - - startYARNWithConfig(yarnConfiguration); - } - - @AfterClass - public static void teardown() throws IOException { - if(zkServer != null) { - zkServer.stop(); - } - - JavaTestKit.shutdownActorSystem(actorSystem); - actorSystem = null; - } - - /** - * Tests that the application master can be killed multiple times and that the surviving - * TaskManager succesfully reconnects to the newly started JobManager. - * @throws Exception - */ - @Test - public void testMultipleAMKill() throws Exception { - final int numberKillingAttempts = numberApplicationAttempts - 1; - - TestingFlinkYarnClient flinkYarnClient = new TestingFlinkYarnClient(); - - Assert.assertNotNull("unable to get yarn client", flinkYarnClient); - flinkYarnClient.setTaskManagerCount(1); - flinkYarnClient.setJobManagerMemory(768); - flinkYarnClient.setTaskManagerMemory(1024); - flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); - flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles())); - - String confDirPath = System.getenv("FLINK_CONF_DIR"); - flinkYarnClient.setConfigurationDirectory(confDirPath); - - String fsStateHandlePath = tmp.getRoot().getPath(); - - flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); - flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + - zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + - "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + - "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" + - "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery"); - flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); - - AbstractFlinkYarnCluster yarnCluster = null; - - final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); - - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - final Configuration config = yarnCluster.getFlinkConfiguration(); - - new JavaTestKit(actorSystem) {{ - for (int attempt = 0; attempt < numberKillingAttempts; attempt++) { - new Within(timeout) { - @Override - protected void run() { - try { - LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config); - ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout); - ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID()); - - gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway); - - expectMsgEquals(Messages.getAcknowledge()); - - gateway.tell(PoisonPill.getInstance()); - } catch (Exception e) { - throw new AssertionError("Could not complete test.", e); - } - } - }; - } - - new Within(timeout) { - @Override - protected void run() { - try { - LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config); - ActorGateway gateway2 = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout); - ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID()); - gateway2.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway); - - expectMsgEquals(Messages.getAcknowledge()); - } catch (Exception e) { - throw new AssertionError("Could not complete test.", e); - } - } - }; - - }}; - } finally { - if (yarnCluster != null) { - yarnCluster.shutdown(false); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java deleted file mode 100644 index 38e17a5..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ /dev/null @@ -1,539 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.yarn; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.google.common.base.Joiner; -import org.apache.commons.io.FileUtils; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.runtime.client.JobClient; -import org.apache.flink.runtime.webmonitor.WebMonitorUtils; -import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.TestBaseUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.log4j.Level; -import org.junit.After; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.util.EnumSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Collections; -import java.util.Comparator; -import java.util.Arrays; -import java.util.concurrent.ConcurrentMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.flink.yarn.UtilsTest.addTestAppender; -import static org.apache.flink.yarn.UtilsTest.checkForLogString; - - -/** - * This test starts a MiniYARNCluster with a CapacityScheduler. - * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team". - */ -public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { - private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class); - - @BeforeClass - public static void setup() { - yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team"); - yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40); - yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60); - yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler"); - startYARNWithConfig(yarnConfiguration); - } - - /** - * Test regular operation, including command line parameter parsing. - */ - @Test - public void testClientStartup() { - LOG.info("Starting testClientStartup()"); - runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", - "-jm", "768", - "-tm", "1024", "-qu", "qa-team"}, - "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0); - LOG.info("Finished testClientStartup()"); - } - - /** - * Test per-job yarn cluster - * - * This also tests the prefixed CliFrontend options for the YARN case - * We also test if the requested parallelism of 2 is passed through. - * The parallelism is requested at the YARN client (-ys). - */ - @Test - public void perJobYarnCluster() { - LOG.info("Starting perJobYarnCluster()"); - addTestAppender(JobClient.class, Level.INFO); - File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here. - Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); - runWithArgs(new String[]{"run", "-m", "yarn-cluster", - "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), - "-yn", "1", - "-ys", "2", //test that the job is executed with a DOP of 2 - "-yjm", "768", - "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, - /* test succeeded after this string */ - "Job execution complete", - /* prohibited strings: (we want to see (2/2)) */ - new String[]{"System.out)(1/1) switched to FINISHED "}, - RunTypes.CLI_FRONTEND, 0, true); - LOG.info("Finished perJobYarnCluster()"); - } - - - /** - * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). - */ - @Test(timeout=100000) // timeout after 100 seconds - public void testTaskManagerFailure() { - LOG.info("Starting testTaskManagerFailure()"); - Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", - "-jm", "768", - "-tm", "1024", - "-s", "3", // set the slots 3 to check if the vCores are set properly! - "-nm", "customName", - "-Dfancy-configuration-value=veryFancy", - "-Dyarn.maximum-failed-containers=3", - "-D" + ConfigConstants.YARN_VCORES + "=2"}, - "Number of connected TaskManagers changed to 1. Slots available: 3", - RunTypes.YARN_SESSION); - - Assert.assertEquals(2, getRunningContainers()); - - // ------------------------ Test if JobManager web interface is accessible ------- - - YarnClient yc = null; - try { - yc = YarnClient.createYarnClient(); - yc.init(yarnConfiguration); - yc.start(); - - List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - Assert.assertEquals(1, apps.size()); // Only one running - ApplicationReport app = apps.get(0); - Assert.assertEquals("customName", app.getName()); - String url = app.getTrackingUrl(); - if(!url.endsWith("/")) { - url += "/"; - } - if(!url.startsWith("http://")) { - url = "http://" + url; - } - LOG.info("Got application URL from YARN {}", url); - - String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); - - JsonNode parsedTMs = new ObjectMapper().readTree(response); - ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); - Assert.assertNotNull(taskManagers); - Assert.assertEquals(1, taskManagers.size()); - Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt()); - - // get the configuration from webinterface & check if the dynamic properties from YARN show up there. - String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config"); - Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig); - - Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value")); - Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers")); - Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES)); - - // -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface - // first, get the hostname/port - String oC = outContent.toString(); - Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)"); - Matcher matches = p.matcher(oC); - String hostname = null; - String port = null; - while(matches.find()) { - hostname = matches.group(1).toLowerCase(); - port = matches.group(2); - } - LOG.info("Extracted hostname:port: {} {}", hostname, port); - - Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname, - parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)); - Assert.assertEquals("unable to find port in " + jsonConfig, port, - parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY)); - - // test logfile access - String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log"); - Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster")); - Assert.assertTrue(logs.contains("Starting JobManager")); - Assert.assertTrue(logs.contains("Starting JobManager Web Frontend")); - } catch(Throwable e) { - LOG.warn("Error while running test",e); - Assert.fail(e.getMessage()); - } - - // ------------------------ Kill container with TaskManager and check if vcores are set correctly ------- - - // find container id of taskManager: - ContainerId taskManagerContainer = null; - NodeManager nodeManager = null; - UserGroupInformation remoteUgi = null; - NMTokenIdentifier nmIdent = null; - try { - remoteUgi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - LOG.warn("Unable to get curr user", e); - Assert.fail(); - } - for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { - NodeManager nm = yarnCluster.getNodeManager(nmId); - ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers(); - for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) { - String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands()); - if(command.contains(YarnTaskManager.class.getSimpleName())) { - taskManagerContainer = entry.getKey(); - nodeManager = nm; - nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0); - // allow myself to do stuff with the container - // remoteUgi.addCredentials(entry.getValue().getCredentials()); - remoteUgi.addTokenIdentifier(nmIdent); - } - } - sleep(500); - } - - Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer); - Assert.assertNotNull("Illegal state", nodeManager); - - try { - List<NodeReport> nodeReports = yc.getNodeReports(NodeState.RUNNING); - - // we asked for one node with 2 vcores so we expect 2 vcores - int userVcores = 0; - for (NodeReport rep: nodeReports) { - userVcores += rep.getUsed().getVirtualCores(); - } - Assert.assertEquals(2, userVcores); - } catch (Exception e) { - Assert.fail("Test failed: " + e.getMessage()); - } - - yc.stop(); - - List<ContainerId> toStop = new LinkedList<ContainerId>(); - toStop.add(taskManagerContainer); - StopContainersRequest scr = StopContainersRequest.newInstance(toStop); - - try { - nodeManager.getNMContext().getContainerManager().stopContainers(scr); - } catch (Throwable e) { - LOG.warn("Error stopping container", e); - Assert.fail("Error stopping container: "+e.getMessage()); - } - - // stateful termination check: - // wait until we saw a container being killed and AFTERWARDS a new one launched - boolean ok = false; - do { - LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString()); - - String o = errContent.toString(); - int killedOff = o.indexOf("Container killed by the ApplicationMaster"); - if (killedOff != -1) { - o = o.substring(killedOff); - ok = o.indexOf("Launching TaskManager") > 0; - } - sleep(1000); - } while(!ok); - - - // send "stop" command to command line interface - runner.sendStop(); - // wait for the thread to stop - try { - runner.join(1000); - } catch (InterruptedException e) { - LOG.warn("Interrupted while stopping runner", e); - } - LOG.warn("stopped"); - - // ----------- Send output to logger - System.setOut(originalStdout); - System.setErr(originalStderr); - String oC = outContent.toString(); - String eC = errContent.toString(); - LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC); - LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC); - - // ------ Check if everything happened correctly - Assert.assertTrue("Expect to see failed container", - eC.contains("New messages from the YARN cluster")); - - Assert.assertTrue("Expect to see failed container", - eC.contains("Container killed by the ApplicationMaster")); - - Assert.assertTrue("Expect to see new container started", - eC.contains("Launching TaskManager") && eC.contains("on host")); - - // cleanup auth for the subsequent tests. - remoteUgi.getTokenIdentifiers().remove(nmIdent); - - LOG.info("Finished testTaskManagerFailure()"); - } - - /** - * Test deployment to non-existing queue. (user-reported error) - * Deployment to the queue is possible because there are no queues, so we don't check. - */ - @Test - public void testNonexistingQueue() { - LOG.info("Starting testNonexistingQueue()"); - addTestAppender(FlinkYarnClient.class, Level.WARN); - runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), - "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", - "-jm", "768", - "-tm", "1024", - "-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1); - checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team"); - LOG.info("Finished testNonexistingQueue()"); - } - - /** - * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client. - */ - @Test - public void perJobYarnClusterWithParallelism() { - LOG.info("Starting perJobYarnClusterWithParallelism()"); - // write log messages to stdout as well, so that the runWithArgs() method - // is catching the log output - addTestAppender(JobClient.class, Level.INFO); - File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here. - Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); - runWithArgs(new String[]{"run", - "-p", "2", //test that the job is executed with a DOP of 2 - "-m", "yarn-cluster", - "-yj", flinkUberjar.getAbsolutePath(), - "-yt", flinkLibFolder.getAbsolutePath(), - "-yn", "1", - "-yjm", "768", - "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, - /* test succeeded after this string */ - "Job execution complete", - /* prohibited strings: (we want to see (2/2)) */ - new String[]{"System.out)(1/1) switched to FINISHED "}, - RunTypes.CLI_FRONTEND, 0, true); - LOG.info("Finished perJobYarnClusterWithParallelism()"); - } - - /** - * Test a fire-and-forget job submission to a YARN cluster. - */ - @Test(timeout=60000) - public void testDetachedPerJobYarnCluster() { - LOG.info("Starting testDetachedPerJobYarnCluster()"); - - File exampleJarLocation = YarnTestBase.findFile( - ".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch", - new ContainsName(new String[] {"-WordCount.jar"})); - - Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation); - - testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath()); - - LOG.info("Finished testDetachedPerJobYarnCluster()"); - } - - /** - * Test a fire-and-forget job submission to a YARN cluster. - */ - @Test(timeout=60000) - public void testDetachedPerJobYarnClusterWithStreamingJob() { - LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()"); - - File exampleJarLocation = YarnTestBase.findFile( - ".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming", - new ContainsName(new String[] {"-WordCount.jar"})); - Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation); - - testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath()); - - LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()"); - } - - private void testDetachedPerJobYarnClusterInternal(String job) { - YarnClient yc = YarnClient.createYarnClient(); - yc.init(yarnConfiguration); - yc.start(); - - // get temporary folder for writing output of wordcount example - File tmpOutFolder = null; - try{ - tmpOutFolder = tmp.newFolder(); - } - catch(IOException e) { - throw new RuntimeException(e); - } - - // get temporary file for reading input data for wordcount example - File tmpInFile; - try{ - tmpInFile = tmp.newFile(); - FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT); - } - catch(IOException e) { - throw new RuntimeException(e); - } - - Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), - "-yt", flinkLibFolder.getAbsolutePath(), - "-yn", "1", - "-yjm", "768", - "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly - "-ytm", "1024", - "-ys", "2", // test requesting slots from YARN. - "--yarndetached", job, "--input", tmpInFile.getAbsoluteFile().toString(), "--output", tmpOutFolder.getAbsoluteFile().toString()}, - "Job has been submitted with JobID", - RunTypes.CLI_FRONTEND); - - // it should usually be 2, but on slow machines, the number varies - Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2); - // give the runner some time to detach - for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } - } - Assert.assertFalse("The runner should detach.", runner.isAlive()); - LOG.info("CLI Frontend has returned, so the job is running"); - - // find out the application id and wait until it has finished. - try { - List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - - ApplicationId tmpAppId; - if (apps.size() == 1) { - // Better method to find the right appId. But sometimes the app is shutting down very fast - // Only one running - tmpAppId = apps.get(0).getApplicationId(); - - LOG.info("waiting for the job with appId {} to finish", tmpAppId); - // wait until the app has finished - while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) { - sleep(500); - } - } else { - // get appId by finding the latest finished appid - apps = yc.getApplications(); - Collections.sort(apps, new Comparator<ApplicationReport>() { - @Override - public int compare(ApplicationReport o1, ApplicationReport o2) { - return o1.getApplicationId().compareTo(o2.getApplicationId())*-1; - } - }); - tmpAppId = apps.get(0).getApplicationId(); - LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray())); - } - final ApplicationId id = tmpAppId; - - // now it has finished. - // check the output files. - File[] listOfOutputFiles = tmpOutFolder.listFiles(); - - - Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles); - LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder ); - - // read all output files in output folder to one output string - String content = ""; - for(File f:listOfOutputFiles) - { - if(f.isFile()) - { - content += FileUtils.readFileToString(f) + "\n"; - } - } - //String content = FileUtils.readFileToString(taskmanagerOut); - // check for some of the wordcount outputs. - Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)")); - Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)")); - - // check if the heap size for the TaskManager was set correctly - File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString()); - } - }); - Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); - content = FileUtils.readFileToString(jobmanagerLog); - // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) - String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m"; - Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'", - content.contains(expected)); - expected = " (2/2) (attempt #0) to "; - Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." + - "This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'", - content.contains(expected)); - - // make sure the detached app is really finished. - LOG.info("Checking again that app has finished"); - ApplicationReport rep; - do { - sleep(500); - rep = yc.getApplicationReport(id); - LOG.info("Got report {}", rep); - } while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING); - - } catch(Throwable t) { - LOG.warn("Error while detached yarn session was running", t); - Assert.fail(t.getMessage()); - } - } - - @After - public void checkForProhibitedLogContents() { - ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java deleted file mode 100644 index cb402a3..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.client.FlinkYarnSessionCli; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; - -import org.apache.log4j.Level; - -import org.junit.After; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Arrays; -import java.util.EnumSet; -import java.util.List; - -import static org.apache.flink.yarn.UtilsTest.addTestAppender; -import static org.apache.flink.yarn.UtilsTest.checkForLogString; - - -/** - * This test starts a MiniYARNCluster with a FIFO scheduler. - * There are no queues for that scheduler. - */ -public class YARNSessionFIFOITCase extends YarnTestBase { - private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class); - - /* - Override init with FIFO scheduler. - */ - @BeforeClass - public static void setup() { - yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); - yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768); - yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); - yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo"); - startYARNWithConfig(yarnConfiguration); - } - - @After - public void checkForProhibitedLogContents() { - ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS); - } - - /** - * Test regular operation, including command line parameter parsing. - */ - @Test(timeout=60000) // timeout after a minute. - public void testDetachedMode() { - LOG.info("Starting testDetachedMode()"); - addTestAppender(FlinkYarnSessionCli.class, Level.INFO); - Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), - "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", - "-jm", "768", - "-tm", "1024", - "--name", "MyCustomName", // test setting a custom name - "--detached"}, - "Flink JobManager is now running on", RunTypes.YARN_SESSION); - - checkForLogString("The Flink YARN client has been started in detached mode"); - - Assert.assertFalse("The runner should detach.", runner.isAlive()); - - LOG.info("Waiting until two containers are running"); - // wait until two containers are running - while(getRunningContainers() < 2) { - sleep(500); - } - LOG.info("Two containers are running. Killing the application"); - - // kill application "externally". - try { - YarnClient yc = YarnClient.createYarnClient(); - yc.init(yarnConfiguration); - yc.start(); - List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - Assert.assertEquals(1, apps.size()); // Only one running - ApplicationReport app = apps.get(0); - - Assert.assertEquals("MyCustomName", app.getName()); - ApplicationId id = app.getApplicationId(); - yc.killApplication(id); - - while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) { - sleep(500); - } - } catch(Throwable t) { - LOG.warn("Killing failed", t); - Assert.fail(); - } - - LOG.info("Finished testDetachedMode()"); - } - - /** - * Test querying the YARN cluster. - * - * This test validates through 666*2 cores in the "cluster". - */ - @Test - public void testQueryCluster() { - LOG.info("Starting testQueryCluster()"); - runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332",null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores. - LOG.info("Finished testQueryCluster()"); - } - - /** - * Test deployment to non-existing queue. (user-reported error) - * Deployment to the queue is possible because there are no queues, so we don't check. - */ - @Test - public void testNonexistingQueue() { - LOG.info("Starting testNonexistingQueue()"); - runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), - "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", - "-jm", "768", - "-tm", "1024", - "-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0); - LOG.info("Finished testNonexistingQueue()"); - } - - /** - * The test cluster has the following resources: - * - 2 Nodes with 4096 MB each. - * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 - * - * We allocate: - * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) - * 5 TaskManagers with 1585 MB - * - * user sees a total request of: 8181 MB (fits) - * system sees a total request of: 8437 (doesn't fit due to min alloc mb) - */ - @Ignore("The test is too resource consuming (8.5 GB of memory)") - @Test - public void testResourceComputation() { - addTestAppender(FlinkYarnClient.class, Level.WARN); - LOG.info("Starting testResourceComputation()"); - runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "5", - "-jm", "256", - "-tm", "1585"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0); - LOG.info("Finished testResourceComputation()"); - checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available."); - } - - /** - * The test cluster has the following resources: - * - 2 Nodes with 4096 MB each. - * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 - * - * We allocate: - * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) - * 2 TaskManagers with 3840 MB - * - * the user sees a total request of: 7936 MB (fits) - * the system sees a request of: 8192 MB (fits) - * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit. - * - * --> check if the system properly rejects allocating this session. - */ - @Ignore("The test is too resource consuming (8 GB of memory)") - @Test - public void testfullAlloc() { - addTestAppender(FlinkYarnClient.class, Level.WARN); - LOG.info("Starting testfullAlloc()"); - runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "2", - "-jm", "256", - "-tm", "3840"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0); - LOG.info("Finished testfullAlloc()"); - checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" + - "After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]"); - } - - /** - * Test the YARN Java API - */ - @Test - public void testJavaAPI() { - final int WAIT_TIME = 15; - LOG.info("Starting testJavaAPI()"); - - AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient(); - Assert.assertNotNull("unable to get yarn client", flinkYarnClient); - flinkYarnClient.setTaskManagerCount(1); - flinkYarnClient.setJobManagerMemory(768); - flinkYarnClient.setTaskManagerMemory(1024); - flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); - flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles())); - String confDirPath = System.getenv("FLINK_CONF_DIR"); - flinkYarnClient.setConfigurationDirectory(confDirPath); - flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); - flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); - - // deploy - AbstractFlinkYarnCluster yarnCluster = null; - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } catch (Exception e) { - LOG.warn("Failing test", e); - Assert.fail("Error while deploying YARN cluster: "+e.getMessage()); - } - GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1); - for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever" - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.warn("Interrupted", e); - } - GetClusterStatusResponse status = yarnCluster.getClusterStatus(); - if(status != null && status.equals(expectedStatus)) { - LOG.info("Cluster reached status " + status); - break; // all good, cluster started - } - if(second > WAIT_TIME) { - // we waited for 15 seconds. cluster didn't come up correctly - Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds"); - } - } - - // use the cluster - Assert.assertNotNull(yarnCluster.getJobManagerAddress()); - Assert.assertNotNull(yarnCluster.getWebInterfaceURL()); - - LOG.info("Shutting down cluster. All tests passed"); - // shutdown cluster - yarnCluster.shutdown(false); - LOG.info("Finished testJavaAPI()"); - } -}
