http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala deleted file mode 100644 index 1652705..0000000 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn - -import org.apache.flink.runtime.ActorLogMessages -import org.apache.flink.runtime.taskmanager.TaskManager -import org.apache.flink.yarn.Messages.StopYarnSession - -trait YarnTaskManager extends ActorLogMessages { - that: TaskManager => - - abstract override def receiveWithLogMessages: Receive = { - receiveYarnMessages orElse super.receiveWithLogMessages - } - - def receiveYarnMessages: Receive = { - case StopYarnSession(status) => { - context.system.shutdown() - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala deleted file mode 100644 index 245651d..0000000 --- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn - -import akka.actor.{Props, ActorRef, ActorSystem} -import com.typesafe.config.ConfigFactory -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.taskmanager.TaskManager - -object YarnUtils { - def createActorSystem(hostname: String, port: Int, configuration: Configuration): ActorSystem = { - val akkaConfig = ConfigFactory.parseString(AkkaUtils.getConfigString(hostname, port, - configuration) + getConfigString) - - AkkaUtils.createActorSystem(akkaConfig) - } - - def createActorSystem(): ActorSystem = { - val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString + - getConfigString) - - AkkaUtils.createActorSystem(akkaConfig) - } - - def getConfigString: String = { - """ - |akka{ - | loglevel = "DEBUG" - | stdout-loglevel = "DEBUG" - | log-dead-letters-during-shutdown = off - | log-dead-letters = off - | - | actor { - | provider = "akka.remote.RemoteActorRefProvider" - | } - | - | remote{ - | log-remote-lifecycle-events = off - | - | netty{ - | tcp{ - | transport-class = "akka.remote.transport.netty.NettyTransport" - | tcp-nodelay = on - | maximum-frame-size = 1MB - | execution-pool-size = 4 - | } - | } - | } - |}""".stripMargin - } - - def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = { - val (hostname, port, config) = TaskManager.parseArgs(args) - - val actorSystem = createActorSystem(hostname, port, config) - - val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) = - TaskManager.parseConfiguration(hostname, config, false) - - (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, jobManagerURL, - taskManagerConfig, networkConnectionConfiguration) with YarnTaskManager))(actorSystem)) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml index a20a375..ea45cdb 100644 --- a/flink-addons/pom.xml +++ b/flink-addons/pom.xml @@ -59,19 +59,6 @@ under the License. <module>flink-tachyon</module> </modules> </profile> - - <profile> - <id>include-yarn</id> - <activation> - <property> - <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh --> - <!--hadoop2--><name>!hadoop.profile</name> - </property> - </activation> - <modules> - <module>flink-yarn</module> - </modules> - </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/pom.xml ---------------------------------------------------------------------- diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index 85506b2..1a96c9c 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -37,11 +37,6 @@ under the License. <dependencies> <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </dependency> - - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${project.version}</version> @@ -109,6 +104,11 @@ under the License. </dependency> <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + + <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-testkit_2.10</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 8092513..358783a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -56,6 +56,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -63,7 +64,12 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; import org.apache.flink.util.StringUtils; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.LogManager; +import org.apache.log4j.PatternLayout; import scala.concurrent.duration.FiniteDuration; /** @@ -71,34 +77,40 @@ import scala.concurrent.duration.FiniteDuration; */ public class CliFrontend { + // run job by deploying Flink into a YARN cluster, if this string is specified as the jobmanager address + public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster"; + + // command line interface of the YARN session, with a special initialization here to prefix all options with y/yarn. + private static FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn"); + //actions private static final String ACTION_RUN = "run"; private static final String ACTION_INFO = "info"; private static final String ACTION_LIST = "list"; private static final String ACTION_CANCEL = "cancel"; - + // general options private static final Option HELP_OPTION = new Option("h", "help", false, "Show the help for the CLI Frontend."); private static final Option VERBOSE_OPTION = new Option("v", "verbose", false, "Print more detailed error messages."); - + // program (jar file) specific options private static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); private static final Option CLASS_OPTION = new Option("c", "class", true, "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the JAR file does not specify the class in its manifest."); private static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value specified in the configuration."); private static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); - - private static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration."); - + + private static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. Specify '"+YARN_DEPLOY_JOBMANAGER+"' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a different JobManager than the one specified in the configuration."); + // info specific options private static final Option PLAN_OPTION = new Option("e", "executionplan", false, "Show optimized execution plan of the program (JSON)"); - + // list specific options private static final Option RUNNING_OPTION = new Option("r", "running", false, "Show running programs and their JobIDs"); private static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false, "Show scheduled prorgrams and their JobIDs"); - + // canceling private static final Option ID_OPTION = new Option("i", "jobid", true, "JobID of program to cancel"); - + static { initOptions(); } @@ -126,6 +138,8 @@ public class CliFrontend { public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString"; // this has to be a regex for String.split() public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; + private static final String DEFAULT_LOG4J_PATTERN_LAYOUT = "%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"; + private CommandLineParser parser; @@ -139,6 +153,15 @@ public class CliFrontend { private Properties yarnProperties; + // this flag indicates if the given Job is executed using a YARN cluster, + // started for this purpose. + private boolean runInYarnCluster = false; + + private AbstractFlinkYarnCluster yarnCluster = null; + + protected String configurationDirectory = null; + + /** * Initializes the class */ @@ -193,6 +216,9 @@ public class CliFrontend { options.addOption(CLASS_OPTION); options.addOption(PARALLELISM_OPTION); options.addOption(ARGS_OPTION); + + // also add the YARN options so that the parser can parse them + yarnSessionCLi.getYARNSessionCLIOptions(options); return options; } @@ -309,7 +335,7 @@ public class CliFrontend { return 1; } - Client client = getClient(line, program.getUserCodeClassLoader()); + Client client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName()); if (client == null) { printHelpForRun(); return 1; @@ -332,8 +358,25 @@ public class CliFrontend { return 1; } } - - return executeProgram(program, client, parallelism); + int programResult = executeProgram(program, client, parallelism); + // check if the program has been executed in a "job only" YARN cluster. + if(runInYarnCluster) { + List<String> msgs = yarnCluster.getNewMessages(); + if(msgs != null && msgs.size() > 1) { + System.out.println("The following messages were created by the YARN cluster while running the Job:"); + for(String msg : msgs) { + System.out.println(msg); + } + } + if(yarnCluster.hasFailed()) { + System.out.println("YARN cluster is in failed state!"); + System.out.println("YARN Diagnostics: " + yarnCluster.getDiagnostics()); + } + System.out.println("Shutting down YARN cluster"); + yarnCluster.shutdown(); + } + + return programResult; } catch (Throwable t) { return handleError(t); @@ -443,7 +486,7 @@ public class CliFrontend { try { // check for json plan request if (plan) { - Client client = getClient(line, program.getUserCodeClassLoader()); + Client client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName()); String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism); if (jsonPlan != null) { @@ -693,14 +736,13 @@ public class CliFrontend { } } - protected InetSocketAddress getJobManagerAddress(CommandLine line) throws IOException { + protected String getJobManagerAddressString(CommandLine line) throws IOException { Configuration configuration = getGlobalConfiguration(); // first, check if the address comes from the command line option if (line.hasOption(ADDRESS_OPTION.getOpt())) { try { - String address = line.getOptionValue(ADDRESS_OPTION.getOpt()); - return RemoteExecutor.getInetFromHostport(address); + return line.getOptionValue(ADDRESS_OPTION.getOpt()); } catch (Exception e) { System.out.println("Error: The JobManager address has an invalid format. " + e.getMessage()); @@ -714,9 +756,9 @@ public class CliFrontend { String address = yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); System.out.println("Found a yarn properties file (" + YARN_PROPERTIES_FILE + ") file, " + "using \""+address+"\" to connect to the JobManager"); - return RemoteExecutor.getInetFromHostport(address); + return address; } catch (Exception e) { - System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. " + System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. " + e.getMessage()); return null; } @@ -726,7 +768,7 @@ public class CliFrontend { // verify that there is a jobmanager address and port in the configuration if (jobManagerAddress == null) { - System.out.println("Error: Found no configuration in the config directory '" + + System.out.println("Error: Found no configuration in the config directory '" + getConfigurationDirectory() + "' that specifies the JobManager address."); return null; } @@ -741,29 +783,37 @@ public class CliFrontend { } if (jobManagerPort == -1) { - System.out.println("Error: Found no configuration in the config directory '" + + System.out.println("Error: Found no configuration in the config directory '" + getConfigurationDirectory() + "' that specifies the JobManager port."); return null; } - return new InetSocketAddress(jobManagerAddress, jobManagerPort); + return jobManagerAddress + ":" + jobManagerPort; } } } protected ActorRef getJobManager(CommandLine line) throws IOException { - InetSocketAddress jobManagerAddress = getJobManagerAddress(line); - if (jobManagerAddress == null) { + //TODO: Get ActorRef from YarnCluster if we are in YARN mode. + String jobManagerAddressStr = getJobManagerAddressString(line); + if (jobManagerAddressStr == null) { return null; } - return JobManager.getJobManager(jobManagerAddress, + return JobManager.getJobManager(RemoteExecutor.getInetFromHostport(jobManagerAddressStr), ActorSystem.create("CliFrontendActorSystem", AkkaUtils .getDefaultActorSystemConfig()),getAkkaTimeout()); } - - protected String getConfigurationDirectory() { + + public String getConfigurationDirectory() { + if(configurationDirectory == null) { + configurationDirectory = getConfigurationDirectoryFromEnv(); + } + return configurationDirectory; + } + + public static String getConfigurationDirectoryFromEnv() { String location = null; if (System.getenv(ENV_CONFIG_DIRECTORY) != null) { location = System.getenv(ENV_CONFIG_DIRECTORY); @@ -860,8 +910,50 @@ public class CliFrontend { return yarnProperties; } - protected Client getClient(CommandLine line, ClassLoader classLoader) throws IOException { - return new Client(getJobManagerAddress(line), getGlobalConfiguration(), classLoader); + protected Client getClient(CommandLine line, ClassLoader classLoader, String programName) throws IOException { + String jmAddrString = getJobManagerAddressString(line); + InetSocketAddress jobManagerAddress = null; + if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) { + System.out.println("YARN cluster mode detected. Switching Log4j output to console"); + LogManager.getRootLogger().addAppender(new ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT))); + + this.runInYarnCluster = true; + // user wants to run Flink in YARN cluster. + AbstractFlinkYarnClient flinkYarnClient = yarnSessionCLi.createFlinkYarnClient(line); + if(flinkYarnClient == null) { + throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); + } + try { + yarnCluster = flinkYarnClient.deploy("Flink Application: "+programName); + } catch(Exception e) { + throw new RuntimeException("Error deploying the YARN cluster", e); + } + jobManagerAddress = yarnCluster.getJobManagerAddress(); + System.out.println("YARN cluster started"); + System.out.println("JobManager web interface address "+yarnCluster.getWebInterfaceURL()); + System.out.println("Waiting until all TaskManagers have connected"); + while(true) { + FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); + if(status != null) { + if (status.getNumberOfTaskManagers() < flinkYarnClient.getTaskManagerCount()) { + System.out.println("TaskManager status (" + status.getNumberOfTaskManagers()+"/"+flinkYarnClient.getTaskManagerCount()+")"); + } else { + System.out.println("Enough TaskManagers are connected"); + break; + } + } else { + System.out.println("No status updates from YARN cluster received so far. Waiting ..."); + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + System.err.println("Thread as interrupted"); Thread.currentThread().interrupt(); + } + } + } else { + jobManagerAddress = RemoteExecutor.getInetFromHostport(jmAddrString); + } + return new Client(jobManagerAddress, getGlobalConfiguration(), classLoader); } /** @@ -891,6 +983,10 @@ public class CliFrontend { System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>"); formatter.setSyntaxPrefix(" \"run\" action arguments:"); formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options())); + formatter.setSyntaxPrefix(" additional arguments if -m "+YARN_DEPLOY_JOBMANAGER+" is set:"); + Options yarnOpts = new Options(); + yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts); + formatter.printHelp(" ", yarnOpts); } private void printHelpForInfo() { @@ -990,14 +1086,15 @@ public class CliFrontend { } } - /** * Submits the job based on the arguments */ public static void main(String[] args) throws ParseException { + CliFrontend cli = new CliFrontend(); int retCode = cli.parseParameters(args); System.exit(retCode); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java new file mode 100644 index 0000000..6546ef0 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * Class handling the command line interface to the YARN session. + */ +public class FlinkYarnSessionCli { + private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class); + + //------------------------------------ Constants ------------------------- + + private static final String CONFIG_FILE_NAME = "flink-conf.yaml"; + public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml"; + public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties"; + + + private static final int CLIENT_POLLING_INTERVALL = 3; + + + //------------------------------------ Command Line argument options ------------------------- + // the prefix transformation is used by the CliFrontend static constructor. + private final Option QUERY; + // --- or --- + private final Option QUEUE; + private final Option SHIP_PATH; + private final Option FLINK_JAR; + private final Option JM_MEMORY; + private final Option TM_MEMORY; + private final Option CONTAINER; + private final Option SLOTS; + + /** + * Dynamic properties allow the user to specify additional configuration values with -D, such as + * -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368 + */ + private final Option DYNAMIC_PROPERTIES; + + private AbstractFlinkYarnCluster yarnCluster = null; + + public FlinkYarnSessionCli(String shortPrefix, String longPrefix) { + QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); + QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); + SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); + FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); + JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]"); + TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]"); + CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)"); + SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager"); + DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties"); + } + + public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { + + AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); + if(flinkYarnClient == null) { + return null; + } + + if(!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option! + LOG.error("Missing required argument " + CONTAINER.getOpt()); + printUsage(); + return null; + } + flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()))); + + // Jar Path + Path localJarPath; + if(cmd.hasOption(FLINK_JAR.getOpt())) { + String userPath = cmd.getOptionValue(FLINK_JAR.getOpt()); + if(!userPath.startsWith("file://")) { + userPath = "file://" + userPath; + } + localJarPath = new Path(userPath); + } else { + LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar"); + localJarPath = new Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath()); + if(!localJarPath.toString().contains("uberjar")) { + // we need to have a proper uberjar because otherwise we don't have the required classes available on the cluster. + // most likely the user did try to start yarn in a regular hadoop2 flink build (not a yarn package) (using ./bin/flink -m yarn-cluster) + LOG.error("The detected jar file '"+localJarPath+"' is not a uberjar."); + return null; + } + } + + flinkYarnClient.setLocalJarPath(localJarPath); + + // Conf Path + String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); + GlobalConfiguration.loadConfiguration(confDirPath); + flinkYarnClient.setConfigurationDirectory(confDirPath); + File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME); + if(!confFile.exists()) { + LOG.error("Unable to locate configuration file in "+confFile); + return null; + } + Path confPath = new Path(confFile.getAbsolutePath()); + + flinkYarnClient.setConfigurationFilePath(confPath); + + List<File> shipFiles = new ArrayList<File>(); + // path to directory to ship + if(cmd.hasOption(SHIP_PATH.getOpt())) { + String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); + File shipDir = new File(shipPath); + if(shipDir.isDirectory()) { + shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return !(name.equals(".") || name.equals("..")); + } + }))); + } else { + LOG.warn("Ship directory is not a directory. Ignoring it."); + } + } + + //check if there is a logback or log4j file + if(confDirPath.length() > 0) { + File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME); + if(logback.exists()) { + shipFiles.add(logback); + flinkYarnClient.setConfigurationFilePath(new Path(logback.toURI())); + } + File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME); + if(log4j.exists()) { + shipFiles.add(log4j); + if(flinkYarnClient.getFlinkLoggingConfigurationPath() != null) { + // this means there is already a logback configuration file --> fail + LOG.error("The configuration directory ('"+confDirPath+"') contains both LOG4J and Logback configuration files." + + "Please delete or rename one of them."); + return null; + } // else + flinkYarnClient.setConfigurationFilePath(new Path(log4j.toURI())); + } + } + + flinkYarnClient.setShipFiles(shipFiles); + + // queue + if(cmd.hasOption(QUEUE.getOpt())) { + flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt())); + } + + // JobManager Memory + if(cmd.hasOption(JM_MEMORY.getOpt())) { + int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); + flinkYarnClient.setJobManagerMemory(jmMemory); + } + + // Task Managers memory + if(cmd.hasOption(TM_MEMORY.getOpt())) { + int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt())); + flinkYarnClient.setTaskManagerMemory(tmMemory); + } + + if(cmd.hasOption(SLOTS.getOpt())) { + int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); + flinkYarnClient.setTaskManagerSlots(slots); + } + + String[] dynamicProperties = null; + if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { + dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt()); + } + String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); + + flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); + + return flinkYarnClient; + } + + + private void printUsage() { + System.out.println("Usage:"); + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(200); + formatter.setLeftPadding(5); + formatter.setSyntaxPrefix(" Required"); + Options req = new Options(); + req.addOption(CONTAINER); + formatter.printHelp(" ", req); + + formatter.setSyntaxPrefix(" Optional"); + Options opt = new Options(); + opt.addOption(JM_MEMORY); + opt.addOption(TM_MEMORY); + opt.addOption(QUERY); + opt.addOption(QUEUE); + opt.addOption(SLOTS); + opt.addOption(DYNAMIC_PROPERTIES); + formatter.printHelp(" ", opt); + } + + public static AbstractFlinkYarnClient getFlinkYarnClient() { + AbstractFlinkYarnClient yarnClient = null; + try { + Class<AbstractFlinkYarnClient> yarnClientClass = (Class<AbstractFlinkYarnClient>) Class.forName("org.apache.flink.yarn.FlinkYarnClient"); + yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class); + } catch (ClassNotFoundException e) { + System.err.println("Unable to locate the Flink YARN Client. Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: "+e.getMessage()); + e.printStackTrace(System.err); + return null; // make it obvious + } + return yarnClient; + } + + private static void writeYarnProperties(Properties properties, File propertiesFile) { + try { + OutputStream out = new FileOutputStream(propertiesFile); + properties.store(out, "Generated YARN properties file"); + out.close(); + } catch (IOException e) { + throw new RuntimeException("Error writing the properties file", e); + } + propertiesFile.setReadable(true, false); // readable for all. + } + + public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) { + final String HELP = "Available commands:\n" + + "help - show these commands\n" + + "stop - stop the YARN session"; + int numTaskmanagers = 0; + try { + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + while (true) { + // ------------------ check if there are updates by the cluster ----------- + + FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); + if(status != null && numTaskmanagers != status.getNumberOfTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to "+status.getNumberOfTaskManagers()+". " + + "Slots available: "+status.getNumberOfSlots()); + numTaskmanagers = status.getNumberOfTaskManagers(); + } + + List<String> messages = yarnCluster.getNewMessages(); + if(messages != null && messages.size() > 0) { + System.err.println("New messages from the YARN cluster: "); + for(String msg : messages) { + System.err.println(msg); + } + } + + if(yarnCluster.hasFailed()) { + System.err.println("The YARN cluster has failed"); + } + + // wait until CLIENT_POLLING_INTERVALL is over or the user entered something. + long startTime = System.currentTimeMillis(); + while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000 + && !in.ready()) { + Thread.sleep(200); + } + //------------- handle interactive command by user. ---------------------- + + if (in.ready()) { + String command = in.readLine(); + if(command.equals("quit") || command.equals("stop")) { + break; // leave loop, cli will stop cluster. + } else if(command.equals("help")) { + System.err.println(HELP); + } else { + System.err.println("Unknown command '"+command+"'. Showing help: \n"+HELP); + } + } + if(yarnCluster.hasBeenStopped()) { + LOG.info("Stopping interactive command line interface, YARN cluster has been stopped."); + break; + } + } + } catch(Exception e) { + LOG.warn("Exception while running the interactive command line interface", e); + return; + } + } + + public static void main(String[] args) { + FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + System.exit(cli.run(args)); + } + + public void getYARNSessionCLIOptions(Options options) { + options.addOption(FLINK_JAR); + options.addOption(JM_MEMORY); + options.addOption(TM_MEMORY); + options.addOption(CONTAINER); + options.addOption(QUEUE); + options.addOption(QUERY); + options.addOption(SHIP_PATH); + options.addOption(SLOTS); + options.addOption(DYNAMIC_PROPERTIES); + } + + public int run(String[] args) { + + // + // Command Line Options + // + Options options = new Options(); + getYARNSessionCLIOptions(options); + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = null; + try { + cmd = parser.parse(options, args); + } catch(Exception e) { + System.out.println(e.getMessage()); + printUsage(); + return 1; + } + + // Query cluster for metrics + if(cmd.hasOption(QUERY.getOpt())) { + AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); + String description = null; + try { + description = flinkYarnClient.getClusterDescription(); + } catch (Exception e) { + System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage()); + e.printStackTrace(System.err); + return 1; + } + System.out.println(description); + return 0; + } else { + AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd); + + if(flinkYarnClient == null) { + System.err.println("Error while starting the YARN Client. Please check log output!"); + return 1; + } + + + try { + yarnCluster = flinkYarnClient.deploy(null); + } catch (Exception e) { + System.err.println("Error while deploying YARN cluster: "+e.getMessage()); + e.printStackTrace(System.err); + return 1; + } + //------------------ Cluster deployed, handle connection details + String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" +yarnCluster.getJobManagerAddress().getPort(); + System.err.println("Flink JobManager is now running on " + jobManagerAddress); + System.err.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); + // file that we write into the conf/ dir containing the jobManager address and the dop. + String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); + File yarnPropertiesFile = new File(confDirPath + File.separator + CliFrontend.YARN_PROPERTIES_FILE); + + Properties yarnProps = new Properties(); + yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); + if(flinkYarnClient.getTaskManagerSlots() != -1) { + yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()) ); + } + // add dynamic properties + if(flinkYarnClient.getDynamicPropertiesEncoded() != null) { + yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, flinkYarnClient.getDynamicPropertiesEncoded()); + } + writeYarnProperties(yarnProps, yarnPropertiesFile); + + //------------------ Cluster running, let user control it ------------ + + runInteractiveCli(yarnCluster); + + LOG.info("Command Line Interface requested session shutdown"); + yarnCluster.shutdown(); + + try { + yarnPropertiesFile.delete(); + } catch (Exception e) { + LOG.warn("Exception while deleting the JobManager address file", e); + } + } + return 0; + } + + /** + * Utility method for tests. + */ + public void stop() { + if(yarnCluster != null) { + LOG.info("Command line interface is shutting down the yarnCluster"); + yarnCluster.shutdown(); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 00fba95..d8f1bf7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -317,7 +317,6 @@ public class Client { } try { - if (wait) { return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java index b6d4542..1bc533f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java @@ -113,7 +113,7 @@ public class CliFrontendInfoTest { } @Override - protected Client getClient(CommandLine line, ClassLoader loader) throws IOException { + protected Client getClient(CommandLine line, ClassLoader loader, String programName) throws IOException { try { return new TestClient(expectedDop); } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java index 6a59019..ef7dff6 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java @@ -29,7 +29,6 @@ import java.net.InetSocketAddress; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; -import org.apache.flink.client.CliFrontend; import org.apache.flink.client.CliFrontendTestUtils.TestingCliFrontend; import org.junit.Before; import org.junit.BeforeClass; @@ -55,7 +54,7 @@ public class CliFrontendJobManagerConnectionTest { TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); - assertTrue(frontend.getJobManagerAddress(line) == null); + assertTrue(frontend.getJobManagerAddressString(line) == null); } catch (Exception e) { System.err.println(e.getMessage()); @@ -72,7 +71,7 @@ public class CliFrontendJobManagerConnectionTest { TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDir()); - InetSocketAddress address = frontend.getJobManagerAddress(line); + InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line)); assertNotNull(address); assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress()); @@ -93,7 +92,7 @@ public class CliFrontendJobManagerConnectionTest { TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile()); - InetSocketAddress address = frontend.getJobManagerAddress(line); + InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line)); assertNotNull(address); assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress()); @@ -114,7 +113,7 @@ public class CliFrontendJobManagerConnectionTest { TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithInvalidYarnFile()); - assertTrue(frontend.getJobManagerAddress(line) == null); + assertTrue(frontend.getJobManagerAddressString(line) == null); } catch (Exception e) { System.err.println(e.getMessage()); @@ -131,7 +130,7 @@ public class CliFrontendJobManagerConnectionTest { TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDir()); - InetSocketAddress address = frontend.getJobManagerAddress(line); + InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line)); assertNotNull(address); assertEquals("10.221.130.22", address.getAddress().getHostAddress()); @@ -152,7 +151,7 @@ public class CliFrontendJobManagerConnectionTest { TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile()); - InetSocketAddress address = frontend.getJobManagerAddress(line); + InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line)); assertNotNull(address); assertEquals("10.221.130.22", address.getAddress().getHostAddress()); http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java index b9af927..0cd7104 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java @@ -62,7 +62,7 @@ public class CliFrontendListCancelTest { // test unrecognized option { String[] parameters = {"-v", "-l"}; - CliFrontend testFrontend = new CliFrontend(); + CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend(); int retCode = testFrontend.cancel(parameters); assertTrue(retCode == 2); } @@ -70,7 +70,7 @@ public class CliFrontendListCancelTest { // test missing job id { String[] parameters = {}; - CliFrontend testFrontend = new CliFrontend(); + CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend(); int retCode = testFrontend.cancel(parameters); assertTrue(retCode != 0); } @@ -104,7 +104,7 @@ public class CliFrontendListCancelTest { // test unrecognized option { String[] parameters = {"-v", "-k"}; - CliFrontend testFrontend = new CliFrontend(); + CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend(); int retCode = testFrontend.list(parameters); assertTrue(retCode == 2); } @@ -112,7 +112,7 @@ public class CliFrontendListCancelTest { // test missing flags { String[] parameters = {}; - CliFrontend testFrontend = new CliFrontend(); + CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend(); int retCode = testFrontend.list(parameters); assertTrue(retCode != 0); } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java index 9d4c6ae..95f6cb8 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java @@ -113,19 +113,13 @@ public class CliFrontendTestUtils { public static class TestingCliFrontend extends CliFrontend { - public final String configDir; - + public TestingCliFrontend() { this(getConfigDir()); } public TestingCliFrontend(String configDir) { - this.configDir = configDir; - } - - @Override - protected String getConfigurationDirectory() { - return this.configDir; + this.configurationDirectory = configDir; } } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index d482e3c..969329e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -193,6 +193,20 @@ public final class ConfigConstants { */ public static final String JOBCLIENT_POLLING_INTERVAL_KEY = "jobclient.polling.interval"; + // ------------------------ YARN Configuration ------------------------ + + /** + * Percentage of heap space to remove from containers started by YARN. + */ + public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio"; + + /** + * Upper bound for heap cutoff on YARN. + * The "yarn.heap-cutoff-ratio" is removing a certain ratio from the heap. + * This value is limiting this cutoff to a absolute value. + */ + public static final String YARN_HEAP_LIMIT_CAP = "yarn.heap-limit-cap"; + // ------------------------ Hadoop Configuration ------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index fb45466..c3c7ae8 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -242,7 +242,6 @@ public abstract class FileSystem { // by now we know that the HadoopFileSystem wrapper can wrap the file system. fs = instantiateHadoopFileSystemWrapper(wrapperClass); fs.initialize(uri); - System.out.println("Initializing new instance of wrapper for "+wrapperClass); CACHE.put(wrappedKey, fs); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index e753a05..91359c2 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -197,10 +197,12 @@ under the License. implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> + <!-- The service transformer is needed to merge META-INF/services files --> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <manifestEntries> - <Main-Class>org.apache.flink.yarn.Client</Main-Class> + <Main-Class>org.apache.flink.yarn.FlinkYarnClient</Main-Class> </manifestEntries> </transformer> </transformers> http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/src/main/flink-bin/bin/flink ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/flink b/flink-dist/src/main/flink-bin/bin/flink index e5dd3c6..12dd6b7 100755 --- a/flink-dist/src/main/flink-bin/bin/flink +++ b/flink-dist/src/main/flink-bin/bin/flink @@ -50,4 +50,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4 export FLINK_CONF_DIR -$JAVA_RUN $JVM_ARGS $log_setting -classpath $CC_CLASSPATH org.apache.flink.client.CliFrontend $* +# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems +$JAVA_RUN $JVM_ARGS $log_setting -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH org.apache.flink.client.CliFrontend $* http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh index 21da505..bf0775f 100644 --- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh +++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh @@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4 export FLINK_CONF_DIR -$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $* +$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../ship/ -j $FLINK_LIB_DIR/*yarn-uberjar.jar $* http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java index 9e0a55b..026758d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java @@ -34,7 +34,6 @@ import javax.servlet.http.HttpServletResponse; import akka.actor.ActorRef; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.Instance; @@ -60,13 +59,13 @@ public class SetupInfoServlet extends HttpServlet { private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class); - final private Configuration globalC; + final private Configuration configuration; final private ActorRef jobmanager; final private FiniteDuration timeout; - public SetupInfoServlet(ActorRef jm, FiniteDuration timeout) { - globalC = GlobalConfiguration.getConfiguration(); + public SetupInfoServlet(Configuration conf, ActorRef jm, FiniteDuration timeout) { + configuration = conf; this.jobmanager = jm; this.timeout = timeout; } @@ -74,7 +73,6 @@ public class SetupInfoServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - resp.setStatus(HttpServletResponse.SC_OK); resp.setContentType("application/json"); @@ -86,15 +84,15 @@ public class SetupInfoServlet extends HttpServlet { } private void writeGlobalConfiguration(HttpServletResponse resp) throws IOException { - - Set<String> keys = globalC.keySet(); + Set<String> keys = configuration.keySet(); List<String> list = new ArrayList<String>(keys); Collections.sort(list); JSONObject obj = new JSONObject(); for (String k : list) { try { - obj.put(k, globalC.getString(k, "")); + + obj.put(k, configuration.getString(k, "")); } catch (JSONException e) { LOG.warn("Json object creation failed", e); } @@ -151,7 +149,7 @@ public class SetupInfoServlet extends HttpServlet { private static final Comparator<Instance> INSTANCE_SORTER = new Comparator<Instance>() { @Override public int compare(Instance o1, Instance o2) { - return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo()); + return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo()); } }; } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index 24dbaf7..2b92f9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -30,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.eclipse.jetty.http.security.Constraint; import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintSecurityHandler; @@ -88,7 +87,7 @@ public class WebInfoServer { // if no explicit configuration is given, use the global configuration if (config == null) { - config = GlobalConfiguration.getConfiguration(); + throw new IllegalArgumentException("No Configuration has been passed to the web server"); } this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, @@ -133,7 +132,7 @@ public class WebInfoServer { servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager, archive, timeout)), "/jobsInfo"); servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo"); - servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager, timeout)), + servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)), "/setupInfo"); servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu"); @@ -206,4 +205,8 @@ public class WebInfoServer { server.stop(); } + public Server getServer() { + return server; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java index ec2633c..5a5f515 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java @@ -76,7 +76,7 @@ public class NetUtils { case ADDRESS: if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) { if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) { - LOG.info("Determined " + i + " as the TaskTracker's own IP address"); + LOG.info("Determined " + i + " as the machine's own IP address"); return i; } } @@ -86,7 +86,7 @@ public class NetUtils { case SLOW_CONNECT: boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout()); if (correct) { - LOG.info("Determined " + i + " as the TaskTracker's own IP address"); + LOG.info("Determined " + i + " as the machine's own IP address"); return i; } break; http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java new file mode 100644 index 0000000..7f2b14e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.yarn; + +import org.apache.hadoop.fs.Path; +import java.io.File; +import java.util.List; + +public abstract class AbstractFlinkYarnClient { + + // ---- Setter for YARN Cluster properties ----- // + public abstract void setJobManagerMemory(int memoryMB); + public abstract void setTaskManagerMemory(int memoryMB); + public abstract void setTaskManagerSlots(int slots); + public abstract int getTaskManagerSlots(); + public abstract void setQueue(String queue); + public abstract void setLocalJarPath(Path localJarPath); + public abstract void setConfigurationFilePath(Path confPath); + public abstract void setFlinkLoggingConfigurationPath(Path logConfPath); + public abstract Path getFlinkLoggingConfigurationPath(); + public abstract void setTaskManagerCount(int tmCount); + public abstract int getTaskManagerCount(); + public abstract void setConfigurationDirectory(String confDirPath); + // List of files to transfer to the YARN containers. + public abstract void setShipFiles(List<File> shipFiles); + public abstract void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded); + public abstract String getDynamicPropertiesEncoded(); + + // ---- Operations on the YARN cluster ----- // + public abstract String getClusterDescription() throws Exception; + + public abstract AbstractFlinkYarnCluster deploy(String clusterName) throws Exception; + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java new file mode 100644 index 0000000..58eaf1d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.yarn; + +import java.net.InetSocketAddress; +import java.util.List; + +public abstract class AbstractFlinkYarnCluster { + + public abstract InetSocketAddress getJobManagerAddress(); + + public abstract String getWebInterfaceURL(); + + public abstract void shutdown(); + + public abstract boolean hasBeenStopped(); + + public abstract FlinkYarnClusterStatus getClusterStatus(); + + public abstract boolean hasFailed(); + + /** + * @return Diagnostics if the Cluster is in "failed" state. + */ + public abstract String getDiagnostics(); + + public abstract List<String> getNewMessages(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java new file mode 100644 index 0000000..2aaaaa0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.yarn; + +import java.io.Serializable; + + +public class FlinkYarnClusterStatus implements Serializable { + private int numberOfTaskManagers; + private int numberOfSlots; + + public FlinkYarnClusterStatus() { + } + + public FlinkYarnClusterStatus(int numberOfTaskManagers, int numberOfSlots) { + this.numberOfTaskManagers = numberOfTaskManagers; + this.numberOfSlots = numberOfSlots; + } + + public int getNumberOfTaskManagers() { + return numberOfTaskManagers; + } + + public void setNumberOfTaskManagers(int numberOfTaskManagers) { + this.numberOfTaskManagers = numberOfTaskManagers; + } + + public int getNumberOfSlots() { + return numberOfSlots; + } + + public void setNumberOfSlots(int numberOfSlots) { + this.numberOfSlots = numberOfSlots; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkYarnClusterStatus that = (FlinkYarnClusterStatus) o; + + if (numberOfSlots != that.numberOfSlots) { + return false; + } + if (numberOfTaskManagers != that.numberOfTaskManagers) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = numberOfTaskManagers; + result = 31 * result + numberOfSlots; + return result; + } + + @Override + public String toString() { + return "FlinkYarnClusterStatus{" + + "numberOfTaskManagers=" + numberOfTaskManagers + + ", numberOfSlots=" + numberOfSlots + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index cf678b0..1f2791c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -53,6 +53,9 @@ object AkkaUtils { } def createActorSystem(akkaConfig: Config): ActorSystem = { + if(LOG.isDebugEnabled) { + LOG.debug(s"Using akka config to create actor system: $akkaConfig") + } ActorSystem.create("flink", akkaConfig) } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index 6a4beed..195a0b6 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -76,7 +76,8 @@ class JobClientListener(client: ActorRef) extends Actor with ActorLogMessages wi client ! Failure(new JobExecutionException(msg, false)) self ! PoisonPill case msg => - println(msg.toString) + // we have to use System.out.println here to avoid erroneous behavior for output redirection + System.out.println(msg.toString) } } http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index cd1119d..37a41a5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -437,6 +437,13 @@ class JobManager(val configuration: Configuration) } } + /** + * Handle unmatched messages with an exception. + */ + override def unhandled(message: Any): Unit = { + throw new RuntimeException("Received unknown message " + message) + } + private def removeJob(jobID: JobID): Unit = { currentJobs.remove(jobID) match { case Some((eg, _)) => archive ! ArchiveExecutionGraph(jobID, eg) http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml new file mode 100644 index 0000000..0cccf3a --- /dev/null +++ b/flink-yarn-tests/pom.xml @@ -0,0 +1,121 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <!-- + There is a separate "flink-yarn-tests" package that expects the "flink-dist" package + to be build before. + We need the YARN fat jar build by flink-dist for the tests. + --> + + <artifactId>flink-yarn-tests</artifactId> + <name>flink-yarn-tests</name> + <packaging>jar</packaging> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>hadoop-core</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-yarn</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Set the root directory for all tests to the project root. + We need this to be able to locate the final build (in flink-dist) + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <!-- Enforce single threaded execution due to port conflicts with the mini yarn cluster --> + <forkCount>1</forkCount> + <workingDirectory>../</workingDirectory> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <workingDirectory>../</workingDirectory> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java new file mode 100644 index 0000000..9fd2541 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +public class UtilsTest { + + @Test + public void testUberjarLocator() { + File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter()); + Assert.assertNotNull(dir); + dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root + Assert.assertTrue(dir.exists()); + Assert.assertTrue(dir.isDirectory()); + Assert.assertTrue(dir.toString().contains("flink-dist")); + List<String> files = Arrays.asList(dir.list()); + Assert.assertTrue(files.contains("lib")); + Assert.assertTrue(files.contains("bin")); + Assert.assertTrue(files.contains("conf")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java new file mode 100644 index 0000000..25e1aa2 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This test starts a MiniYARNCluster with a CapacityScheduler. + * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team". + */ +public class YARNSessionCapacitySchedulerIT extends YarnTestBase { + private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerIT.class); + + @BeforeClass + public static void setup() { + yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); + yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team"); + yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40); + yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60); + startYARNWithConfig(yarnConfiguration); + } + + /** + * Test regular operation, including command line parameter parsing. + */ + @Test + public void testClientStartup() { + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "1", + "-jm", "512", + "-tm", "1024", "-qu", "qa-team"}, + "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION); + } + + + /** + * Test deployment to non-existing queue. (user-reported error) + * Deployment to the queue is possible because there are no queues, so we don't check. + */ + @Test + public void testNonexistingQueue() { + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "1", + "-jm", "512", + "-tm", "1024", + "-qu", "doesntExist"}, "Error while deploying YARN cluster: The specified queue 'doesntExist' does not exist. Available queues: default, qa-team, ", RunTypes.YARN_SESSION); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java new file mode 100644 index 0000000..5f8ae87 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + + +/** + * This test starts a MiniYARNCluster with a FIFO scheudler. + * There are no queues for that scheduler. + */ +public class YARNSessionFIFOIT extends YarnTestBase { + private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOIT.class); + + /* + Override init with FIFO scheduler. + */ + @BeforeClass + public static void setup() { + yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); + startYARNWithConfig(yarnConfiguration); + } + /** + * Test regular operation, including command line parameter parsing. + */ + @Test + public void testClientStartup() { + LOG.info("Starting testClientStartup()"); + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "1", + "-jm", "512", + "-tm", "1024"}, + "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION); + LOG.info("Finished testClientStartup()"); + } + + /** + * Test querying the YARN cluster. + * + * This test validates through 666*2 cores in the "cluster". + */ + @Test + public void testQueryCluster() { + LOG.info("Starting testQueryCluster()"); + runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores. + LOG.info("Finished testQueryCluster()"); + } + + /** + * Test deployment to non-existing queue. (user-reported error) + * Deployment to the queue is possible because there are no queues, so we don't check. + */ + @Test + public void testNonexistingQueue() { + LOG.info("Starting testNonexistingQueue()"); + runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), + "-n", "1", + "-jm", "512", + "-tm", "1024", + "-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION); + LOG.info("Finished testNonexistingQueue()"); + } + + /** + * Test requesting more resources than available. + */ + @Test + public void testMoreNodesThanAvailable() { + LOG.info("Starting testMoreNodesThanAvailable()"); + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "10", + "-jm", "512", + "-tm", "1024"}, "Error while deploying YARN cluster: This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available.", RunTypes.YARN_SESSION); + LOG.info("Finished testMoreNodesThanAvailable()"); + } + + /** + * The test cluster has the following resources: + * - 2 Nodes with 4096 MB each. + * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 + * + * We allocate: + * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) + * 5 TaskManagers with 1585 MB + * + * user sees a total request of: 8181 MB (fits) + * system sees a total request of: 8437 (doesn't fit due to min alloc mb) + */ + @Test + public void testResourceComputation() { + LOG.info("Starting testResourceComputation()"); + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "5", + "-jm", "256", + "-tm", "1585"}, "Error while deploying YARN cluster: This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.", RunTypes.YARN_SESSION); + LOG.info("Finished testResourceComputation()"); + } + + /** + * The test cluster has the following resources: + * - 2 Nodes with 4096 MB each. + * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 + * + * We allocate: + * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) + * 2 TaskManagers with 3840 MB + * + * the user sees a total request of: 7936 MB (fits) + * the system sees a request of: 8192 MB (fits) + * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit. + * + * --> check if the system properly rejects allocating this session. + */ + @Test + public void testfullAlloc() { + LOG.info("Starting testfullAlloc()"); + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "2", + "-jm", "256", + "-tm", "3840"}, "Error while deploying YARN cluster: There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" + + "After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]", RunTypes.YARN_SESSION); + LOG.info("Finished testfullAlloc()"); + } + + /** + * Test per-job yarn cluster + * + * This also tests the prefixed CliFrontend options for the YARN case + */ + @Test + public void perJobYarnCluster() { + LOG.info("Starting perJobYarnCluster()"); + File exampleJarLocation = YarnTestBase.findFile(".", new ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount here. + runWithArgs(new String[] {"run", "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), + "-yn", "1", + "-yjm", "512", + "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND); + LOG.info("Finished perJobYarnCluster()"); + } + + /** + * Test the YARN Java API + */ + @Test + public void testJavaAPI() { + final int WAIT_TIME = 15; + LOG.info("Starting testJavaAPI()"); + + AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient(); + flinkYarnClient.setTaskManagerCount(1); + flinkYarnClient.setJobManagerMemory(512); + flinkYarnClient.setTaskManagerMemory(512); + flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); + String confDirPath = System.getenv("FLINK_CONF_DIR"); + flinkYarnClient.setConfigurationDirectory(confDirPath); + flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); + + // deploy + AbstractFlinkYarnCluster yarnCluster = null; + try { + yarnCluster = flinkYarnClient.deploy(null); + } catch (Exception e) { + System.err.println("Error while deploying YARN cluster: "+e.getMessage()); + e.printStackTrace(System.err); + Assert.fail(); + } + FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1); + for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever" + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + Thread.interrupted(); + } + FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); + if(status != null && status.equals(expectedStatus)) { + LOG.info("Cluster reached status " + status); + break; // all good, cluster started + } + if(second > WAIT_TIME) { + // we waited for 15 seconds. cluster didn't come up correctly + Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds"); + } + } + + // use the cluster + Assert.assertNotNull(yarnCluster.getJobManagerAddress()); + Assert.assertNotNull(yarnCluster.getWebInterfaceURL()); + + LOG.info("Shutting down cluster. All tests passed"); + // shutdown cluster + yarnCluster.shutdown(); + LOG.info("Finished testJavaAPI()"); + } +}