[
https://issues.apache.org/jira/browse/FLINK-2935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15105240#comment-15105240
]
ASF GitHub Bot commented on FLINK-2935:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1500#discussion_r49994979
--- Diff:
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
---
@@ -61,99 +73,217 @@ object FlinkShell {
)
cmd("remote") action { (_, c) =>
- c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+ c.copy(executionMode = ExecutionMode.REMOTE)
} text("starts Flink scala shell connecting to a remote cluster\n")
children(
arg[String]("<host>") action { (h, c) =>
- c.copy(host = h) }
+ c.copy(host = Some(h)) }
text("remote host name as string"),
arg[Int]("<port>") action { (p, c) =>
- c.copy(port = p) }
+ c.copy(port = Some(p)) }
text("remote port as integer\n"),
opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>")
action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
- } text("specifies additional jars to be used in Flink")
+ } text ("specifies additional jars to be used in Flink")
)
- help("help") abbr("h") text("prints this usage text\n")
+
+ cmd("yarn") action {
+ (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig =
None)
+ } text ("starts Flink scala shell connecting to a yarn cluster\n")
children(
+ opt[Int]("container") abbr ("n") valueName ("arg") action {
+ (x, c) =>
+ c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers =
Some(x))))
+ } text ("Number of YARN container to allocate (= Number of Task
Managers)"),
+ opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+ (x, c) =>
+ c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
+ } text ("Memory for JobManager Container [in MB]"),
+ opt[String]("name") abbr ("nm") action {
+ (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name
= Some(x))))
+ } text ("Set a custom name for the application on YARN"),
+ opt[String]("queue") abbr ("qu") valueName ("<arg>") action {
+ (x, c) => c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(queue = Some(x))))
+ } text ("Specify YARN queue"),
+ opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
+ (x, c) => c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(slots = Some(x))))
+ } text ("Number of slots per TaskManager"),
+ opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>")
action {
+ (x, c) =>
+ c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
+ } text ("Memory per TaskManager Container [in MB]"),
+ opt[(String)] ("addclasspath") abbr("a")
valueName("<path/to/jar>") action {
+ case (x, c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink\n")
+ )
+
+ help("help") abbr ("h") text ("prints this usage text\n")
}
// parse arguments
- parser.parse (args, Config()) match {
- case Some(config) =>
- startShell(config.host,
- config.port,
- config.flinkShellExecutionMode,
- config.externalJars)
-
- case _ => System.out.println("Could not parse program arguments")
+ parser.parse(args, Config()) match {
+ case Some(config) => startShell(config)
+ case _ => println("Could not parse program arguments")
}
}
+ def fetchConnectionInfo(
+ config: Config
+ ): (String, Int, Option[Either[FlinkMiniCluster,
AbstractFlinkYarnCluster]]) = {
+ config.executionMode match {
+ case ExecutionMode.LOCAL => // Local mode
+ val config = new Configuration()
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- def startShell(
- userHost: String,
- userPort: Int,
- executionMode: ExecutionMode.Value,
- externalJars: Option[Array[String]] = None): Unit ={
-
- System.out.println("Starting Flink Shell:")
-
- // either port or userhost not specified by user, create new
minicluster
- val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
- executionMode match {
- case ExecutionMode.LOCAL =>
- val config = new Configuration()
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- val miniCluster = new LocalFlinkMiniCluster(config, false)
- miniCluster.start()
- val port = miniCluster.getLeaderRPCPort
- System.out.println(s"\nStarting local Flink cluster (host:
localhost, port: $port).\n")
- ("localhost", port, Some(miniCluster))
-
- case ExecutionMode.REMOTE =>
- if (userHost == "none" || userPort == -1) {
- System.out.println("Error: <host> or <port> not specified!")
- return
- } else {
- System.out.println(
- s"\nConnecting to Flink cluster (host: $userHost, port:
$userPort).\n")
- (userHost, userPort, None)
- }
-
- case ExecutionMode.UNDEFINED =>
- System.out.println("Error: please specify execution mode:")
- System.out.println("[local | remote <host> <port>]")
- return
- }
+ val miniCluster = new LocalFlinkMiniCluster(config, false)
+ miniCluster.start()
- var repl: Option[FlinkILoop] = None
+ println("\nStarting local Flink cluster (host: localhost, " +
+ s"port: ${miniCluster.getLeaderRPCPort}).\n")
+ ("localhost", miniCluster.getLeaderRPCPort,
Some(Left(miniCluster)))
- try {
- // custom shell
- repl = Some(
- bufferedReader match {
+ case ExecutionMode.REMOTE => // Remote mode
+ if (config.host.isEmpty || config.port.isEmpty) {
+ throw new IllegalArgumentException("<host> or <port> is not
specified!")
+ }
+ (config.host.get, config.port.get, None)
- case Some(br) =>
- val out = new StringWriter()
- new FlinkILoop(host, port, externalJars, bufferedReader, new
JPrintWriter(out))
+ case ExecutionMode.YARN => // YARN mode
+ config.yarnConfig match {
+ case Some(yarnConfig) => // if there is information for new
cluster
+ deployNewYarnCluster(yarnConfig)
+ case None => // there is no information for new cluster. Then we
use yarn properties.
+ fetchDeployedYarnClusterInfo()
+ }
- case None =>
- new FlinkILoop(host, port, externalJars)
- })
+ case ExecutionMode.UNDEFINED => // Wrong input
+ throw new IllegalArgumentException("please specify execution
mode:\n" +
+ "[local | remote <host> <port> | yarn]")
+ }
+ }
- val settings = new Settings()
+ def startShell(config: Config): Unit = {
+ println("Starting Flink Shell:")
- settings.usejavacp.value = true
- settings.Yreplsync.value = true
+ val (repl, cluster) = try {
+ val (host, port, cluster) = fetchConnectionInfo(config)
+ println(s"\nConnecting to Flink cluster (host: $host, port:
$port).\n")
+ val repl: Option[FlinkILoop] = bufferedReader match {
+ case Some(reader) =>
+ val out = new StringWriter()
+ Some(new FlinkILoop(host, port, config.externalJars, reader, new
JPrintWriter(out)))
+ case None =>
+ Some(new FlinkILoop(host, port, config.externalJars))
+ }
+
+ (repl, cluster)
+ } catch {
+ case e: IllegalArgumentException =>
+ println(s"Error: ${e.getMessage}")
+ sys.exit()
+ }
- // start scala interpreter shell
+ val settings = new Settings()
+ settings.usejavacp.value = true
+ settings.Yreplsync.value = true
+
+ try {
repl.foreach(_.process(settings))
} finally {
repl.foreach(_.closeInterpreter())
- cluster.foreach(_.stop())
+ cluster match {
+ case Some(Left(miniCluster)) => miniCluster.stop()
+ case Some(Right(yarnCluster)) => yarnCluster.shutdown(false)
+ case _ =>
+ }
+ }
+
+ println(" good bye ..")
+ }
+
+ def deployNewYarnCluster(yarnConfig: YarnConfig) = {
+ val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient
+
+ // use flink-dist.jar for scala shell
+ val jarPath = new Path("file://" +
+
s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}")
+ yarnClient.setLocalJarPath(jarPath)
+
+ // load configuration
+ val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv
+ val flinkConfiguration = GlobalConfiguration.getConfiguration
+ val confFile = new File(confDirPath + File.separator +
"flink-conf.yaml")
+ val confPath = new Path(confFile.getAbsolutePath)
+ GlobalConfiguration.loadConfiguration(confDirPath)
+ yarnClient.setFlinkConfigurationObject(flinkConfiguration)
+ yarnClient.setConfigurationDirectory(confDirPath)
+ yarnClient.setConfigurationFilePath(confPath)
+
+ // number of task managers is required.
+ yarnConfig.containers match {
+ case Some(containers) => yarnClient.setTaskManagerCount(containers)
+ case None =>
+ throw new IllegalArgumentException("Number of taskmanagers must be
specified.")
+ }
+
+ // set configuration from user input
+ yarnConfig.jobManagerMemory.foreach(yarnClient.setJobManagerMemory)
+ yarnConfig.name.foreach(yarnClient.setName)
+ yarnConfig.queue.foreach(yarnClient.setQueue)
+ yarnConfig.slots.foreach(yarnClient.setTaskManagerSlots)
+ yarnConfig.taskManagerMemory.foreach(yarnClient.setTaskManagerMemory)
+
+ // deploy
+ val cluster = yarnClient.deploy()
+ val address = cluster.getJobManagerAddress.getAddress.getHostAddress
+ val port = cluster.getJobManagerAddress.getPort
+ cluster.connectToCluster()
+
+ (address, port, Some(Right(cluster)))
+ }
+
+ def fetchDeployedYarnClusterInfo() = {
+ // load configuration
+ val globalConfig = GlobalConfiguration.getConfiguration
+ val defaultPropertiesLocation = System.getProperty("java.io.tmpdir")
+ val currentUser = System.getProperty("user.name")
+ val propertiesLocation = globalConfig.getString(
+ ConfigConstants.YARN_PROPERTIES_FILE_LOCATION,
defaultPropertiesLocation)
+ val propertiesName = CliFrontend.YARN_PROPERTIES_FILE + currentUser
+ val propertiesFile = new File(propertiesLocation, propertiesName)
--- End diff --
Maybe we could unify the `propertiesName` generation somewhere. It is also
implemented in `ExecutionEnvironment`. If it changes at one place, then one has
to remember that it also has to be changed here. This will most definitely be
forgotten.
> Allow scala shell to read yarn properties
> -----------------------------------------
>
> Key: FLINK-2935
> URL: https://issues.apache.org/jira/browse/FLINK-2935
> Project: Flink
> Issue Type: Improvement
> Components: Scala Shell
> Affects Versions: 0.9.1
> Reporter: Johannes
> Assignee: Chiwan Park
> Priority: Minor
> Labels: easyfix
>
> Currently the deployment of flink via yarn and the scala shell are not linked.
> When deploying a yarn session the file
> bq. org.apache.flink.client.CliFrontend
> creates a
> bq. .yarn-properties-$username
> file with the connection properties.
> There should be a way to have the scala shell automatically read this file if
> wanted as well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)