http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java deleted file mode 100644 index fc1e5bc..0000000 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java +++ /dev/null @@ -1,619 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.commons.io.FileUtils; -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.FlinkYarnSessionCli; -import org.apache.flink.test.util.TestBaseUtils; -import org.apache.flink.util.TestLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.log4j.spi.LoggingEvent; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileWriter; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Scanner; -import java.util.concurrent.ConcurrentMap; - - -/** - * This base class allows to use the MiniYARNCluster. - * The cluster is re-used for all tests. - * - * This class is located in a different package which is build after flink-dist. This way, - * we can use the YARN uberjar of flink to start a Flink YARN session. - * - * The test is not thread-safe. Parallel execution of tests is not possible! - */ -public abstract class YarnTestBase extends TestLogger { - private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class); - - protected final static PrintStream originalStdout = System.out; - protected final static PrintStream originalStderr = System.err; - - protected static String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name"; - - protected final static int NUM_NODEMANAGERS = 2; - - /** The tests are scanning for these strings in the final output. */ - protected final static String[] PROHIBITED_STRINGS = { - "Exception", // we don't want any exceptions to happen - "Started [email protected]:8081" // Jetty should start on a random port in YARN mode. - }; - - /** These strings are white-listed, overriding teh prohibited strings */ - protected final static String[] WHITELISTED_STRINGS = { - "akka.remote.RemoteTransportExceptionNoStackTrace", - // workaround for annoying InterruptedException logging: - // https://issues.apache.org/jira/browse/YARN-1022 - "java.lang.InterruptedException" - }; - - // Temp directory which is deleted after the unit test. - @ClassRule - public static TemporaryFolder tmp = new TemporaryFolder(); - - protected static MiniYARNCluster yarnCluster = null; - - /** - * Uberjar (fat jar) file of Flink - */ - protected static File flinkUberjar; - - protected static final Configuration yarnConfiguration; - - /** - * lib/ folder of the flink distribution. - */ - protected static File flinkLibFolder; - - static { - yarnConfiguration = new YarnConfiguration(); - yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); - yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways - yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); - yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); - yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); - yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4); - yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); - yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); - yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. - // so we have to change the number of cores for testing. - yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN). - } - - - - /** - * Sleep a bit between the tests (we are re-using the YARN cluster for the tests) - */ - @After - public void sleep() { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Assert.fail("Should not happen"); - } - } - - private YarnClient yarnClient = null; - @Before - public void checkClusterEmpty() throws IOException, YarnException { - if(yarnClient == null) { - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(yarnConfiguration); - yarnClient.start(); - } - - List<ApplicationReport> apps = yarnClient.getApplications(); - for(ApplicationReport app : apps) { - if(app.getYarnApplicationState() != YarnApplicationState.FINISHED - && app.getYarnApplicationState() != YarnApplicationState.KILLED - && app.getYarnApplicationState() != YarnApplicationState.FAILED) { - Assert.fail("There is at least one application on the cluster is not finished." + - "App "+app.getApplicationId()+" is in state "+app.getYarnApplicationState()); - } - } - } - - /** - * Locate a file or directory - */ - public static File findFile(String startAt, FilenameFilter fnf) { - File root = new File(startAt); - String[] files = root.list(); - if(files == null) { - return null; - } - for(String file : files) { - File f = new File(startAt + File.separator + file); - if(f.isDirectory()) { - File r = findFile(f.getAbsolutePath(), fnf); - if(r != null) { - return r; - } - } else if (fnf.accept(f.getParentFile(), f.getName())) { - return f; - } - } - return null; - } - - /** - * Filter to find root dir of the flink-yarn dist. - */ - public static class RootDirFilenameFilter implements FilenameFilter { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("flink-dist") && name.endsWith(".jar") && dir.toString().contains("/lib"); - } - } - - public static class ContainsName implements FilenameFilter { - private String[] names; - private String excludeInPath = null; - - /** - * @param names which have to be included in the filename. - */ - public ContainsName(String[] names) { - this.names = names; - } - - public ContainsName(String[] names, String excludeInPath) { - this.names = names; - this.excludeInPath = excludeInPath; - } - - @Override - public boolean accept(File dir, String name) { - if(excludeInPath == null) { - for(String n: names) { - if(!name.contains(n)) { - return false; - } - } - return true; - } else { - for(String n: names) { - if(!name.contains(n)) { - return false; - } - } - return !dir.toString().contains(excludeInPath); - } - } - } - - public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException { - tmp.create(); - File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml"); - - FileWriter writer = new FileWriter(yarnSiteXML); - yarnConf.writeXml(writer); - writer.flush(); - writer.close(); - return yarnSiteXML; - } - - /** - * This method checks the written TaskManager and JobManager log files - * for exceptions. - * - * WARN: Please make sure the tool doesn't find old logfiles from previous test runs. - * So always run "mvn clean" before running the tests here. - * - */ - public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) { - File cwd = new File("target/" + yarnConfiguration.get(TEST_CLUSTER_NAME_KEY)); - Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists()); - Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory()); - - File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - // scan each file for prohibited strings. - File f = new File(dir.getAbsolutePath()+ "/" + name); - try { - Scanner scanner = new Scanner(f); - while (scanner.hasNextLine()) { - final String lineFromFile = scanner.nextLine(); - for (String aProhibited : prohibited) { - if (lineFromFile.contains(aProhibited)) { - - boolean whitelistedFound = false; - for (String white : whitelisted) { - if (lineFromFile.contains(white)) { - whitelistedFound = true; - break; - } - } - - if (!whitelistedFound) { - // logging in FATAL to see the actual message in TRAVIS tests. - Marker fatal = MarkerFactory.getMarker("FATAL"); - LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile); - return true; - } - } - } - - } - } catch (FileNotFoundException e) { - LOG.warn("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath()); - } - - return false; - } - }); - if(foundFile != null) { - Scanner scanner = null; - try { - scanner = new Scanner(foundFile); - } catch (FileNotFoundException e) { - Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath()); - } - LOG.warn("Found a file with a prohibited string. Printing contents:"); - while (scanner.hasNextLine()) { - LOG.warn("LINE: "+scanner.nextLine()); - } - Assert.fail("Found a file "+foundFile+" with a prohibited string: "+Arrays.toString(prohibited)); - } - } - - public static void sleep(int time) { - try { - Thread.sleep(time); - } catch (InterruptedException e) { - LOG.warn("Interruped",e); - } - } - - public static int getRunningContainers() { - int count = 0; - for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { - NodeManager nm = yarnCluster.getNodeManager(nmId); - ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers(); - count += containers.size(); - } - return count; - } - - public static void startYARNWithConfig(Configuration conf) { - // set the home directory to a tmp directory. Flink on YARN is using the home dir to distribute the file - File homeDir = null; - try { - homeDir = tmp.newFolder(); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - System.setProperty("user.home", homeDir.getAbsolutePath()); - String uberjarStartLoc = ".."; - LOG.info("Trying to locate uberjar in {}", new File(uberjarStartLoc)); - flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter()); - Assert.assertNotNull("Flink uberjar not found", flinkUberjar); - String flinkDistRootDir = flinkUberjar.getParentFile().getParent(); - flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/ - Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder); - Assert.assertTrue("lib folder not found", flinkLibFolder.exists()); - Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory()); - - if (!flinkUberjar.exists()) { - Assert.fail("Unable to locate yarn-uberjar.jar"); - } - - try { - LOG.info("Starting up MiniYARNCluster"); - if (yarnCluster == null) { - yarnCluster = new MiniYARNCluster(conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY), NUM_NODEMANAGERS, 1, 1); - - yarnCluster.init(conf); - yarnCluster.start(); - } - - Map<String, String> map = new HashMap<String, String>(System.getenv()); - - File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"})); - Assert.assertNotNull(flinkConfDirPath); - - map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent()); - - File yarnConfFile = writeYarnSiteConfigXML(conf); - map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); - map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos - TestBaseUtils.setEnv(map); - - Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); - - // wait for the nodeManagers to connect - while(!yarnCluster.waitForNodeManagersToConnect(500)) { - LOG.info("Waiting for Nodemanagers to connect"); - } - } catch (Exception ex) { - ex.printStackTrace(); - LOG.error("setup failure", ex); - Assert.fail(); - } - } - - /** - * Default @BeforeClass impl. Overwrite this for passing a different configuration - */ - @BeforeClass - public static void setup() { - startYARNWithConfig(yarnConfiguration); - } - - // -------------------------- Runner -------------------------- // - - protected static ByteArrayOutputStream outContent; - protected static ByteArrayOutputStream errContent; - enum RunTypes { - YARN_SESSION, CLI_FRONTEND - } - - /** - * This method returns once the "startedAfterString" has been seen. - */ - protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) { - LOG.info("Running with args {}", Arrays.toString(args)); - - outContent = new ByteArrayOutputStream(); - errContent = new ByteArrayOutputStream(); - System.setOut(new PrintStream(outContent)); - System.setErr(new PrintStream(errContent)); - - - final int START_TIMEOUT_SECONDS = 60; - - Runner runner = new Runner(args, type); - runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs())."); - runner.start(); - - for(int second = 0; second < START_TIMEOUT_SECONDS; second++) { - sleep(1000); - // check output for correct TaskManager startup. - if(outContent.toString().contains(startedAfterString) - || errContent.toString().contains(startedAfterString) ) { - LOG.info("Found expected output in redirected streams"); - return runner; - } - // check if thread died - if(!runner.isAlive()) { - sendOutput(); - Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue()); - } - } - - sendOutput(); - Assert.fail("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " + - "expected string did not show up"); - return null; - } - - protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) { - runWithArgs(args,terminateAfterString, failOnStrings, type, returnCode, false); - } - /** - * The test has been passed once the "terminateAfterString" has been seen. - * @param args Command line arguments for the runner - * @param terminateAfterString the runner is searching the stdout and stderr for this string. as soon as it appears, the test has passed - * @param failOnStrings The runner is searching stdout and stderr for the strings specified here. If one appears, the test has failed - * @param type Set the type of the runner - * @param returnCode Expected return code from the runner. - * @param checkLogForTerminateString If true, the runner checks also the log4j logger for the terminate string - */ - protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode, boolean checkLogForTerminateString) { - LOG.info("Running with args {}", Arrays.toString(args)); - - outContent = new ByteArrayOutputStream(); - errContent = new ByteArrayOutputStream(); - System.setOut(new PrintStream(outContent)); - System.setErr(new PrintStream(errContent)); - - - // we wait for at most three minutes - final int START_TIMEOUT_SECONDS = 180; - final long deadline = System.currentTimeMillis() + (START_TIMEOUT_SECONDS * 1000); - - Runner runner = new Runner(args, type); - runner.start(); - - boolean expectedStringSeen = false; - boolean testPassedFromLog4j = false; - do { - sleep(1000); - String outContentString = outContent.toString(); - String errContentString = errContent.toString(); - if(failOnStrings != null) { - for (String failOnString : failOnStrings) { - if (outContentString.contains(failOnString) - || errContentString.contains(failOnString)) { - LOG.warn("Failing test. Output contained illegal string '" + failOnString + "'"); - sendOutput(); - // stopping runner. - runner.sendStop(); - Assert.fail("Output contained illegal string '" + failOnString + "'"); - } - } - } - // check output for the expected terminateAfterString. - if(checkLogForTerminateString) { - LoggingEvent matchedEvent = UtilsTest.getEventContainingString(terminateAfterString); - if(matchedEvent != null) { - testPassedFromLog4j = true; - LOG.info("Found expected output in logging event {}", matchedEvent); - } - - } - - if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) || testPassedFromLog4j ) { - expectedStringSeen = true; - LOG.info("Found expected output in redirected streams"); - // send "stop" command to command line interface - LOG.info("RunWithArgs: request runner to stop"); - runner.sendStop(); - // wait for the thread to stop - try { - runner.join(30000); - } - catch (InterruptedException e) { - LOG.warn("Interrupted while stopping runner", e); - } - LOG.warn("RunWithArgs runner stopped."); - } - else { - // check if thread died - if (!runner.isAlive()) { - if (runner.getReturnValue() != 0) { - Assert.fail("Runner thread died before the test was finished. Return value = " - + runner.getReturnValue()); - } else { - LOG.info("Runner stopped earlier than expected with return value = 0"); - } - // leave loop: the runner died, so we can not expect new strings to show up. - break; - } - } - } - while (!expectedStringSeen && System.currentTimeMillis() < deadline); - - sendOutput(); - Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " + - "expected string did not show up", expectedStringSeen); - - // check for 0 return code - Assert.assertEquals("Expected return value", returnCode, runner.getReturnValue()); - LOG.info("Test was successful"); - } - - protected static void sendOutput() { - System.setOut(originalStdout); - System.setErr(originalStderr); - - LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString()); - LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString()); - } - - public static class Runner extends Thread { - private final String[] args; - private int returnValue; - private RunTypes type; - private FlinkYarnSessionCli yCli; - - public Runner(String[] args, RunTypes type) { - this.args = args; - this.type = type; - } - - public int getReturnValue() { - return returnValue; - } - - @Override - public void run() { - switch(type) { - case YARN_SESSION: - yCli = new FlinkYarnSessionCli("", ""); - returnValue = yCli.run(args); - break; - case CLI_FRONTEND: - try { - CliFrontend cli = new CliFrontend(); - returnValue = cli.parseParameters(args); - } catch (Exception e) { - throw new RuntimeException(e); - } - break; - default: - throw new RuntimeException("Unknown type " + type); - } - - if(returnValue != 0) { - Assert.fail("The YARN session returned with non-null value="+returnValue); - } - } - - public void sendStop() { - if(yCli != null) { - yCli.stop(); - } - } - } - - // -------------------------- Tear down -------------------------- // - - @AfterClass - public static void copyOnTravis() { - // When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files) - // to <flinkRoot>/target/flink-yarn-tests-*. - // The files from there are picked up by the ./tools/travis_watchdog.sh script - // to upload them to Amazon S3. - if(isOnTravis()) { - File target = new File("../target" + yarnConfiguration.get(TEST_CLUSTER_NAME_KEY)); - if(!target.mkdirs()) { - LOG.warn("Error creating dirs to {}", target); - } - File src = tmp.getRoot(); - LOG.info("copying the final files from {} to {}", src.getAbsolutePath(), target.getAbsolutePath()); - try { - FileUtils.copyDirectoryToDirectory(src, target); - } catch (IOException e) { - LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e); - } - } - } - - public static boolean isOnTravis() { - return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true"); - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties b/flink-yarn-tests/src/main/resources/log4j-test.properties deleted file mode 100644 index ebe0d37..0000000 --- a/flink-yarn-tests/src/main/resources/log4j-test.properties +++ /dev/null @@ -1,35 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, console - -# Log all infos in the given file -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console - -# log whats going on between the tests -log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO -log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO -log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO -log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO -log4j.logger.org.apache.flink.runtime.leaderelection=INFO -log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala deleted file mode 100644 index 2f93785..0000000 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn - -import java.util.concurrent.ExecutorService - -import akka.actor.ActorRef -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler -import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.testingUtils.TestingJobManagerLike - -import scala.concurrent.duration.FiniteDuration - -/** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin. - * - * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition - * instead of an anonymous class with the respective mixin to obtain a more readable logger name. - * - * @param flinkConfiguration Configuration object for the actor - * @param executorService Execution context which is used to execute concurrent tasks in the - * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] - * @param instanceManager Instance manager to manage the registered - * [[org.apache.flink.runtime.taskmanager.TaskManager]] - * @param scheduler Scheduler to schedule Flink jobs - * @param libraryCacheManager Manager to manage uploaded jar files - * @param archive Archive for finished Flink jobs - * @param restartStrategyFactory Default restart strategy for job restarts - * @param timeout Timeout for futures - * @param leaderElectionService LeaderElectionService to participate in the leader election - */ -class TestingYarnJobManager( - flinkConfiguration: Configuration, - executorService: ExecutorService, - instanceManager: InstanceManager, - scheduler: Scheduler, - libraryCacheManager: BlobLibraryCacheManager, - archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, - timeout: FiniteDuration, - leaderElectionService: LeaderElectionService, - submittedJobGraphs : SubmittedJobGraphStore, - checkpointRecoveryFactory : CheckpointRecoveryFactory, - savepointStore: SavepointStore, - jobRecoveryTimeout: FiniteDuration) - extends YarnJobManager( - flinkConfiguration, - executorService, - instanceManager, - scheduler, - libraryCacheManager, - archive, - restartStrategyFactory, - timeout, - leaderElectionService, - submittedJobGraphs, - checkpointRecoveryFactory, - savepointStore, - jobRecoveryTimeout) - with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala deleted file mode 100644 index 73ab7eb..0000000 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn - -import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.instance.InstanceConnectionInfo -import org.apache.flink.runtime.io.disk.iomanager.IOManager -import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService -import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration -import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike - -/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin. - * - * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition - * instead of an anonymous class with the respective mixin to obtain a more readable logger name. - * - * @param config Configuration object for the actor - * @param resourceID The Yarn container id - * @param connectionInfo Connection information of this actor - * @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation - * @param ioManager IOManager responsible for I/O - * @param network NetworkEnvironment for this actor - * @param numberOfSlots Number of slots for this TaskManager - * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading - * JobManager - */ -class TestingYarnTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: InstanceConnectionInfo, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService) - extends YarnTaskManager( - config, - resourceID, - connectionInfo, - memoryManager, - ioManager, - network, - numberOfSlots, - leaderRetrievalService) - with TestingTaskManagerLike { - - object YarnTaskManager { - - /** Entry point (main method) to run the TaskManager on YARN. - * @param args The command line arguments. - */ - def main(args: Array[String]): Unit = { - YarnTaskManagerRunner.runYarnTaskManager(args, classOf[TestingYarnTaskManager]) - } - - } -} - - http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java new file mode 100644 index 0000000..30116af --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.test.util.TestBaseUtils; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class FlinkYarnSessionCliTest { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testDynamicProperties() throws IOException { + + Map<String, String> map = new HashMap<String, String>(System.getenv()); + File tmpFolder = tmp.newFolder(); + File fakeConf = new File(tmpFolder, "flink-conf.yaml"); + fakeConf.createNewFile(); + map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath()); + TestBaseUtils.setEnv(map); + Options options = new Options(); + FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); + cli.getYARNSessionCLIOptions(options); + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = null; + try { + cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"}); + } catch(Exception e) { + e.printStackTrace(); + Assert.fail("Parsing failed with " + e.getMessage()); + } + + AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd); + + Assert.assertNotNull(flinkYarnClient); + + Map<String, String> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded()); + Assert.assertEquals(1, dynProperties.size()); + Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java new file mode 100644 index 0000000..b0757f5 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.SignalHandler; + +/** + * Yarn application master which starts the {@link TestingYarnJobManager}, + * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}. + */ +public class TestingApplicationMaster extends YarnApplicationMasterRunner { + + @Override + public Class<? extends JobManager> getJobManagerClass() { + return TestingYarnJobManager.class; + } + + @Override + public Class<? extends MemoryArchivist> getArchivistClass() { + return TestingMemoryArchivist.class; + } + + @Override + protected Class<? extends TaskManager> getTaskManagerClass() { + return TestingYarnTaskManager.class; + } + + @Override + public Class<? extends YarnFlinkResourceManager> getResourceManagerClass() { + return TestingYarnFlinkResourceManager.class; + } + + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args); + SignalHandler.register(LOG); + + // run and exit with the proper return code + int returnCode = new TestingApplicationMaster().run(args); + System.exit(returnCode); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java new file mode 100644 index 0000000..1efc336 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.ArrayList; +import java.util.List; + +/** + * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the + * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which + * are shipped to the yarn cluster. This is necessary to load the testing classes. + */ +public class TestingFlinkYarnClient extends FlinkYarnClientBase { + + public TestingFlinkYarnClient() { + List<File> filesToShip = new ArrayList<>(); + + File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests")); + Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " + + "Make sure to package the flink-yarn-tests module."); + + File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime")); + Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " + + "jar. Make sure to package the flink-runtime module."); + + filesToShip.add(testingJar); + filesToShip.add(testingRuntimeJar); + + setShipFiles(filesToShip); + } + + @Override + protected Class<?> getApplicationMasterClass() { + return TestingApplicationMaster.class; + } + + public static class TestJarFinder implements FilenameFilter { + + private final String jarName; + + public TestJarFinder(final String jarName) { + this.jarName = jarName; + } + + @Override + public boolean accept(File dir, String name) { + return name.startsWith(jarName) && name.endsWith("-tests.jar") && + dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java new file mode 100644 index 0000000..5a61b8f --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Flink's testing resource manager for Yarn. + */ +public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager { + + public TestingYarnFlinkResourceManager( + Configuration flinkConfig, + YarnConfiguration yarnConfig, + LeaderRetrievalService leaderRetrievalService, + String applicationMasterHostName, + String webInterfaceURL, + ContaineredTaskManagerParameters taskManagerParameters, + ContainerLaunchContext taskManagerLaunchContext, + int yarnHeartbeatIntervalMillis, + int maxFailedContainers, + int numInitialTaskManagers) { + + super( + flinkConfig, + yarnConfig, + leaderRetrievalService, + applicationMasterHostName, + webInterfaceURL, + taskManagerParameters, + taskManagerLaunchContext, + yarnHeartbeatIntervalMillis, + maxFailedContainers, + numInitialTaskManagers); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java new file mode 100644 index 0000000..8586a77 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import java.io.IOException; + +/** + * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}. + */ +public class TestingYarnTaskManagerRunner { + public static void main(String[] args) throws IOException { + YarnTaskManagerRunner.runYarnTaskManager(args, TestingYarnTaskManager.class); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java new file mode 100644 index 0000000..784bf24 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class UtilsTest { + private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class); + + @Test + public void testUberjarLocator() { + File dir = YarnTestBase.findFile("..", new YarnTestBase.RootDirFilenameFilter()); + Assert.assertNotNull(dir); + Assert.assertTrue(dir.getName().endsWith(".jar")); + dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root + Assert.assertTrue(dir.exists()); + Assert.assertTrue(dir.isDirectory()); + List<String> files = Arrays.asList(dir.list()); + Assert.assertTrue(files.contains("lib")); + Assert.assertTrue(files.contains("bin")); + Assert.assertTrue(files.contains("conf")); + } + + /** + * Remove 15% of the heap, at least 384MB. + * + */ + @Test + public void testHeapCutoff() { + Configuration conf = new Configuration(); + conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15); + conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384); + + Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) ); + Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) ); + + // test different configuration + Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf)); + + conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000"); + conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1"); + Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf)); + + conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5"); + Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf)); + + conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1"); + Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); + + // test also deprecated keys + conf = new Configuration(); + conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15); + conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384); + + Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) ); + Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) ); + } + + @Test(expected = IllegalArgumentException.class) + public void illegalArgument() { + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1"); + Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); + } + + @Test(expected = IllegalArgumentException.class) + public void illegalArgumentNegative() { + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01"); + Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); + } + + @Test(expected = IllegalArgumentException.class) + public void tooMuchCutoff() { + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000"); + Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); + } + + @Test + public void testGetEnvironmentVariables() { + Configuration testConf = new Configuration(); + testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native"); + + Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf); + + Assert.assertEquals(1, res.size()); + Map.Entry<String, String> entry = res.entrySet().iterator().next(); + Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey()); + Assert.assertEquals("/usr/lib/native", entry.getValue()); + } + + @Test + public void testGetEnvironmentVariablesErroneous() { + Configuration testConf = new Configuration(); + testConf.setString("yarn.application-master.env.", "/usr/lib/native"); + + Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf); + + Assert.assertEquals(0, res.size()); + } + + // + // --------------- Tools to test if a certain string has been logged with Log4j. ------------- + // See : http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j + // + private static TestAppender testAppender; + public static void addTestAppender(Class target, Level level) { + testAppender = new TestAppender(); + testAppender.setThreshold(level); + org.apache.log4j.Logger lg = org.apache.log4j.Logger.getLogger(target); + lg.setLevel(level); + lg.addAppender(testAppender); + //org.apache.log4j.Logger.getRootLogger().addAppender(testAppender); + } + + public static void checkForLogString(String expected) { + LoggingEvent found = getEventContainingString(expected); + if(found != null) { + LOG.info("Found expected string '"+expected+"' in log message "+found); + return; + } + Assert.fail("Unable to find expected string '" + expected + "' in log messages"); + } + + public static LoggingEvent getEventContainingString(String expected) { + if(testAppender == null) { + throw new NullPointerException("Initialize test appender first"); + } + LoggingEvent found = null; + // make sure that different threads are not logging while the logs are checked + synchronized (testAppender.events) { + for (LoggingEvent event : testAppender.events) { + if (event.getMessage().toString().contains(expected)) { + found = event; + break; + } + } + } + return found; + } + + public static class TestAppender extends AppenderSkeleton { + public final List<LoggingEvent> events = new ArrayList<>(); + public void close() {} + public boolean requiresLayout() {return false;} + @Override + protected void append(LoggingEvent event) { + synchronized (events){ + events.add(event); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java new file mode 100644 index 0000000..a93abf0 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.testkit.JavaTestKit; +import org.apache.curator.test.TestingServer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +public class YARNHighAvailabilityITCase extends YarnTestBase { + + private static TestingServer zkServer; + + private static ActorSystem actorSystem; + + private static final int numberApplicationAttempts = 10; + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + try { + zkServer = new TestingServer(); + zkServer.start(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Could not start ZooKeeper testing cluster."); + } + + yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha"); + yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts); + + startYARNWithConfig(yarnConfiguration); + } + + @AfterClass + public static void teardown() throws IOException { + if(zkServer != null) { + zkServer.stop(); + } + + JavaTestKit.shutdownActorSystem(actorSystem); + actorSystem = null; + } + + /** + * Tests that the application master can be killed multiple times and that the surviving + * TaskManager succesfully reconnects to the newly started JobManager. + * @throws Exception + */ + @Test + public void testMultipleAMKill() throws Exception { + final int numberKillingAttempts = numberApplicationAttempts - 1; + + TestingFlinkYarnClient flinkYarnClient = new TestingFlinkYarnClient(); + + Assert.assertNotNull("unable to get yarn client", flinkYarnClient); + flinkYarnClient.setTaskManagerCount(1); + flinkYarnClient.setJobManagerMemory(768); + flinkYarnClient.setTaskManagerMemory(1024); + flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); + flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles())); + + String confDirPath = System.getenv("FLINK_CONF_DIR"); + flinkYarnClient.setConfigurationDirectory(confDirPath); + + String fsStateHandlePath = tmp.getRoot().getPath(); + + flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + + "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" + + "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery"); + flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); + + AbstractFlinkYarnCluster yarnCluster = null; + + final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); + + try { + yarnCluster = flinkYarnClient.deploy(); + yarnCluster.connectToCluster(); + final Configuration config = yarnCluster.getFlinkConfiguration(); + + new JavaTestKit(actorSystem) {{ + for (int attempt = 0; attempt < numberKillingAttempts; attempt++) { + new Within(timeout) { + @Override + protected void run() { + try { + LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config); + ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout); + ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID()); + + gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway); + + expectMsgEquals(Messages.getAcknowledge()); + + gateway.tell(PoisonPill.getInstance()); + } catch (Exception e) { + throw new AssertionError("Could not complete test.", e); + } + } + }; + } + + new Within(timeout) { + @Override + protected void run() { + try { + LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config); + ActorGateway gateway2 = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout); + ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID()); + gateway2.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway); + + expectMsgEquals(Messages.getAcknowledge()); + } catch (Exception e) { + throw new AssertionError("Could not complete test.", e); + } + } + }; + + }}; + } finally { + if (yarnCluster != null) { + yarnCluster.shutdown(false); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java new file mode 100644 index 0000000..38e17a5 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Joiner; +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Collections; +import java.util.Comparator; +import java.util.Arrays; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.yarn.UtilsTest.addTestAppender; +import static org.apache.flink.yarn.UtilsTest.checkForLogString; + + +/** + * This test starts a MiniYARNCluster with a CapacityScheduler. + * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team". + */ +public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { + private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class); + + @BeforeClass + public static void setup() { + yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); + yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team"); + yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40); + yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60); + yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler"); + startYARNWithConfig(yarnConfiguration); + } + + /** + * Test regular operation, including command line parameter parsing. + */ + @Test + public void testClientStartup() { + LOG.info("Starting testClientStartup()"); + runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), + "-n", "1", + "-jm", "768", + "-tm", "1024", "-qu", "qa-team"}, + "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0); + LOG.info("Finished testClientStartup()"); + } + + /** + * Test per-job yarn cluster + * + * This also tests the prefixed CliFrontend options for the YARN case + * We also test if the requested parallelism of 2 is passed through. + * The parallelism is requested at the YARN client (-ys). + */ + @Test + public void perJobYarnCluster() { + LOG.info("Starting perJobYarnCluster()"); + addTestAppender(JobClient.class, Level.INFO); + File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here. + Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); + runWithArgs(new String[]{"run", "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), + "-yn", "1", + "-ys", "2", //test that the job is executed with a DOP of 2 + "-yjm", "768", + "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, + /* test succeeded after this string */ + "Job execution complete", + /* prohibited strings: (we want to see (2/2)) */ + new String[]{"System.out)(1/1) switched to FINISHED "}, + RunTypes.CLI_FRONTEND, 0, true); + LOG.info("Finished perJobYarnCluster()"); + } + + + /** + * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). + */ + @Test(timeout=100000) // timeout after 100 seconds + public void testTaskManagerFailure() { + LOG.info("Starting testTaskManagerFailure()"); + Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), + "-n", "1", + "-jm", "768", + "-tm", "1024", + "-s", "3", // set the slots 3 to check if the vCores are set properly! + "-nm", "customName", + "-Dfancy-configuration-value=veryFancy", + "-Dyarn.maximum-failed-containers=3", + "-D" + ConfigConstants.YARN_VCORES + "=2"}, + "Number of connected TaskManagers changed to 1. Slots available: 3", + RunTypes.YARN_SESSION); + + Assert.assertEquals(2, getRunningContainers()); + + // ------------------------ Test if JobManager web interface is accessible ------- + + YarnClient yc = null; + try { + yc = YarnClient.createYarnClient(); + yc.init(yarnConfiguration); + yc.start(); + + List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); + Assert.assertEquals(1, apps.size()); // Only one running + ApplicationReport app = apps.get(0); + Assert.assertEquals("customName", app.getName()); + String url = app.getTrackingUrl(); + if(!url.endsWith("/")) { + url += "/"; + } + if(!url.startsWith("http://")) { + url = "http://" + url; + } + LOG.info("Got application URL from YARN {}", url); + + String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); + + JsonNode parsedTMs = new ObjectMapper().readTree(response); + ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); + Assert.assertNotNull(taskManagers); + Assert.assertEquals(1, taskManagers.size()); + Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt()); + + // get the configuration from webinterface & check if the dynamic properties from YARN show up there. + String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config"); + Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig); + + Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value")); + Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers")); + Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES)); + + // -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface + // first, get the hostname/port + String oC = outContent.toString(); + Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)"); + Matcher matches = p.matcher(oC); + String hostname = null; + String port = null; + while(matches.find()) { + hostname = matches.group(1).toLowerCase(); + port = matches.group(2); + } + LOG.info("Extracted hostname:port: {} {}", hostname, port); + + Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname, + parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)); + Assert.assertEquals("unable to find port in " + jsonConfig, port, + parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY)); + + // test logfile access + String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log"); + Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster")); + Assert.assertTrue(logs.contains("Starting JobManager")); + Assert.assertTrue(logs.contains("Starting JobManager Web Frontend")); + } catch(Throwable e) { + LOG.warn("Error while running test",e); + Assert.fail(e.getMessage()); + } + + // ------------------------ Kill container with TaskManager and check if vcores are set correctly ------- + + // find container id of taskManager: + ContainerId taskManagerContainer = null; + NodeManager nodeManager = null; + UserGroupInformation remoteUgi = null; + NMTokenIdentifier nmIdent = null; + try { + remoteUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + LOG.warn("Unable to get curr user", e); + Assert.fail(); + } + for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { + NodeManager nm = yarnCluster.getNodeManager(nmId); + ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers(); + for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) { + String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands()); + if(command.contains(YarnTaskManager.class.getSimpleName())) { + taskManagerContainer = entry.getKey(); + nodeManager = nm; + nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0); + // allow myself to do stuff with the container + // remoteUgi.addCredentials(entry.getValue().getCredentials()); + remoteUgi.addTokenIdentifier(nmIdent); + } + } + sleep(500); + } + + Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer); + Assert.assertNotNull("Illegal state", nodeManager); + + try { + List<NodeReport> nodeReports = yc.getNodeReports(NodeState.RUNNING); + + // we asked for one node with 2 vcores so we expect 2 vcores + int userVcores = 0; + for (NodeReport rep: nodeReports) { + userVcores += rep.getUsed().getVirtualCores(); + } + Assert.assertEquals(2, userVcores); + } catch (Exception e) { + Assert.fail("Test failed: " + e.getMessage()); + } + + yc.stop(); + + List<ContainerId> toStop = new LinkedList<ContainerId>(); + toStop.add(taskManagerContainer); + StopContainersRequest scr = StopContainersRequest.newInstance(toStop); + + try { + nodeManager.getNMContext().getContainerManager().stopContainers(scr); + } catch (Throwable e) { + LOG.warn("Error stopping container", e); + Assert.fail("Error stopping container: "+e.getMessage()); + } + + // stateful termination check: + // wait until we saw a container being killed and AFTERWARDS a new one launched + boolean ok = false; + do { + LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString()); + + String o = errContent.toString(); + int killedOff = o.indexOf("Container killed by the ApplicationMaster"); + if (killedOff != -1) { + o = o.substring(killedOff); + ok = o.indexOf("Launching TaskManager") > 0; + } + sleep(1000); + } while(!ok); + + + // send "stop" command to command line interface + runner.sendStop(); + // wait for the thread to stop + try { + runner.join(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while stopping runner", e); + } + LOG.warn("stopped"); + + // ----------- Send output to logger + System.setOut(originalStdout); + System.setErr(originalStderr); + String oC = outContent.toString(); + String eC = errContent.toString(); + LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC); + LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC); + + // ------ Check if everything happened correctly + Assert.assertTrue("Expect to see failed container", + eC.contains("New messages from the YARN cluster")); + + Assert.assertTrue("Expect to see failed container", + eC.contains("Container killed by the ApplicationMaster")); + + Assert.assertTrue("Expect to see new container started", + eC.contains("Launching TaskManager") && eC.contains("on host")); + + // cleanup auth for the subsequent tests. + remoteUgi.getTokenIdentifiers().remove(nmIdent); + + LOG.info("Finished testTaskManagerFailure()"); + } + + /** + * Test deployment to non-existing queue. (user-reported error) + * Deployment to the queue is possible because there are no queues, so we don't check. + */ + @Test + public void testNonexistingQueue() { + LOG.info("Starting testNonexistingQueue()"); + addTestAppender(FlinkYarnClient.class, Level.WARN); + runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), + "-t", flinkLibFolder.getAbsolutePath(), + "-n", "1", + "-jm", "768", + "-tm", "1024", + "-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1); + checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team"); + LOG.info("Finished testNonexistingQueue()"); + } + + /** + * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client. + */ + @Test + public void perJobYarnClusterWithParallelism() { + LOG.info("Starting perJobYarnClusterWithParallelism()"); + // write log messages to stdout as well, so that the runWithArgs() method + // is catching the log output + addTestAppender(JobClient.class, Level.INFO); + File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here. + Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); + runWithArgs(new String[]{"run", + "-p", "2", //test that the job is executed with a DOP of 2 + "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), + "-yt", flinkLibFolder.getAbsolutePath(), + "-yn", "1", + "-yjm", "768", + "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, + /* test succeeded after this string */ + "Job execution complete", + /* prohibited strings: (we want to see (2/2)) */ + new String[]{"System.out)(1/1) switched to FINISHED "}, + RunTypes.CLI_FRONTEND, 0, true); + LOG.info("Finished perJobYarnClusterWithParallelism()"); + } + + /** + * Test a fire-and-forget job submission to a YARN cluster. + */ + @Test(timeout=60000) + public void testDetachedPerJobYarnCluster() { + LOG.info("Starting testDetachedPerJobYarnCluster()"); + + File exampleJarLocation = YarnTestBase.findFile( + ".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch", + new ContainsName(new String[] {"-WordCount.jar"})); + + Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation); + + testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath()); + + LOG.info("Finished testDetachedPerJobYarnCluster()"); + } + + /** + * Test a fire-and-forget job submission to a YARN cluster. + */ + @Test(timeout=60000) + public void testDetachedPerJobYarnClusterWithStreamingJob() { + LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()"); + + File exampleJarLocation = YarnTestBase.findFile( + ".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming", + new ContainsName(new String[] {"-WordCount.jar"})); + Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation); + + testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath()); + + LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()"); + } + + private void testDetachedPerJobYarnClusterInternal(String job) { + YarnClient yc = YarnClient.createYarnClient(); + yc.init(yarnConfiguration); + yc.start(); + + // get temporary folder for writing output of wordcount example + File tmpOutFolder = null; + try{ + tmpOutFolder = tmp.newFolder(); + } + catch(IOException e) { + throw new RuntimeException(e); + } + + // get temporary file for reading input data for wordcount example + File tmpInFile; + try{ + tmpInFile = tmp.newFile(); + FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT); + } + catch(IOException e) { + throw new RuntimeException(e); + } + + Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), + "-yt", flinkLibFolder.getAbsolutePath(), + "-yn", "1", + "-yjm", "768", + "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly + "-ytm", "1024", + "-ys", "2", // test requesting slots from YARN. + "--yarndetached", job, "--input", tmpInFile.getAbsoluteFile().toString(), "--output", tmpOutFolder.getAbsoluteFile().toString()}, + "Job has been submitted with JobID", + RunTypes.CLI_FRONTEND); + + // it should usually be 2, but on slow machines, the number varies + Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2); + // give the runner some time to detach + for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } + Assert.assertFalse("The runner should detach.", runner.isAlive()); + LOG.info("CLI Frontend has returned, so the job is running"); + + // find out the application id and wait until it has finished. + try { + List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); + + ApplicationId tmpAppId; + if (apps.size() == 1) { + // Better method to find the right appId. But sometimes the app is shutting down very fast + // Only one running + tmpAppId = apps.get(0).getApplicationId(); + + LOG.info("waiting for the job with appId {} to finish", tmpAppId); + // wait until the app has finished + while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) { + sleep(500); + } + } else { + // get appId by finding the latest finished appid + apps = yc.getApplications(); + Collections.sort(apps, new Comparator<ApplicationReport>() { + @Override + public int compare(ApplicationReport o1, ApplicationReport o2) { + return o1.getApplicationId().compareTo(o2.getApplicationId())*-1; + } + }); + tmpAppId = apps.get(0).getApplicationId(); + LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray())); + } + final ApplicationId id = tmpAppId; + + // now it has finished. + // check the output files. + File[] listOfOutputFiles = tmpOutFolder.listFiles(); + + + Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles); + LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder ); + + // read all output files in output folder to one output string + String content = ""; + for(File f:listOfOutputFiles) + { + if(f.isFile()) + { + content += FileUtils.readFileToString(f) + "\n"; + } + } + //String content = FileUtils.readFileToString(taskmanagerOut); + // check for some of the wordcount outputs. + Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)")); + Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)")); + + // check if the heap size for the TaskManager was set correctly + File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString()); + } + }); + Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); + content = FileUtils.readFileToString(jobmanagerLog); + // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) + String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m"; + Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'", + content.contains(expected)); + expected = " (2/2) (attempt #0) to "; + Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." + + "This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'", + content.contains(expected)); + + // make sure the detached app is really finished. + LOG.info("Checking again that app has finished"); + ApplicationReport rep; + do { + sleep(500); + rep = yc.getApplicationReport(id); + LOG.info("Got report {}", rep); + } while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING); + + } catch(Throwable t) { + LOG.warn("Error while detached yarn session was running", t); + Assert.fail(t.getMessage()); + } + } + + @After + public void checkForProhibitedLogContents() { + ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS); + } +}
