[
https://issues.apache.org/jira/browse/FLINK-2935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15105239#comment-15105239
]
ASF GitHub Bot commented on FLINK-2935:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1500#discussion_r49994742
--- Diff:
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
---
@@ -61,99 +73,217 @@ object FlinkShell {
)
cmd("remote") action { (_, c) =>
- c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+ c.copy(executionMode = ExecutionMode.REMOTE)
} text("starts Flink scala shell connecting to a remote cluster\n")
children(
arg[String]("<host>") action { (h, c) =>
- c.copy(host = h) }
+ c.copy(host = Some(h)) }
text("remote host name as string"),
arg[Int]("<port>") action { (p, c) =>
- c.copy(port = p) }
+ c.copy(port = Some(p)) }
text("remote port as integer\n"),
opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>")
action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
- } text("specifies additional jars to be used in Flink")
+ } text ("specifies additional jars to be used in Flink")
)
- help("help") abbr("h") text("prints this usage text\n")
+
+ cmd("yarn") action {
+ (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig =
None)
+ } text ("starts Flink scala shell connecting to a yarn cluster\n")
children(
+ opt[Int]("container") abbr ("n") valueName ("arg") action {
+ (x, c) =>
+ c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers =
Some(x))))
+ } text ("Number of YARN container to allocate (= Number of Task
Managers)"),
+ opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+ (x, c) =>
+ c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
+ } text ("Memory for JobManager Container [in MB]"),
+ opt[String]("name") abbr ("nm") action {
+ (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name
= Some(x))))
+ } text ("Set a custom name for the application on YARN"),
+ opt[String]("queue") abbr ("qu") valueName ("<arg>") action {
+ (x, c) => c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(queue = Some(x))))
+ } text ("Specify YARN queue"),
+ opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
+ (x, c) => c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(slots = Some(x))))
+ } text ("Number of slots per TaskManager"),
+ opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>")
action {
+ (x, c) =>
+ c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
+ } text ("Memory per TaskManager Container [in MB]"),
+ opt[(String)] ("addclasspath") abbr("a")
valueName("<path/to/jar>") action {
+ case (x, c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink\n")
+ )
+
+ help("help") abbr ("h") text ("prints this usage text\n")
}
// parse arguments
- parser.parse (args, Config()) match {
- case Some(config) =>
- startShell(config.host,
- config.port,
- config.flinkShellExecutionMode,
- config.externalJars)
-
- case _ => System.out.println("Could not parse program arguments")
+ parser.parse(args, Config()) match {
+ case Some(config) => startShell(config)
+ case _ => println("Could not parse program arguments")
}
}
+ def fetchConnectionInfo(
+ config: Config
+ ): (String, Int, Option[Either[FlinkMiniCluster,
AbstractFlinkYarnCluster]]) = {
+ config.executionMode match {
+ case ExecutionMode.LOCAL => // Local mode
+ val config = new Configuration()
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- def startShell(
- userHost: String,
- userPort: Int,
- executionMode: ExecutionMode.Value,
- externalJars: Option[Array[String]] = None): Unit ={
-
- System.out.println("Starting Flink Shell:")
-
- // either port or userhost not specified by user, create new
minicluster
- val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
- executionMode match {
- case ExecutionMode.LOCAL =>
- val config = new Configuration()
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- val miniCluster = new LocalFlinkMiniCluster(config, false)
- miniCluster.start()
- val port = miniCluster.getLeaderRPCPort
- System.out.println(s"\nStarting local Flink cluster (host:
localhost, port: $port).\n")
- ("localhost", port, Some(miniCluster))
-
- case ExecutionMode.REMOTE =>
- if (userHost == "none" || userPort == -1) {
- System.out.println("Error: <host> or <port> not specified!")
- return
- } else {
- System.out.println(
- s"\nConnecting to Flink cluster (host: $userHost, port:
$userPort).\n")
- (userHost, userPort, None)
- }
-
- case ExecutionMode.UNDEFINED =>
- System.out.println("Error: please specify execution mode:")
- System.out.println("[local | remote <host> <port>]")
- return
- }
+ val miniCluster = new LocalFlinkMiniCluster(config, false)
+ miniCluster.start()
- var repl: Option[FlinkILoop] = None
+ println("\nStarting local Flink cluster (host: localhost, " +
+ s"port: ${miniCluster.getLeaderRPCPort}).\n")
+ ("localhost", miniCluster.getLeaderRPCPort,
Some(Left(miniCluster)))
- try {
- // custom shell
- repl = Some(
- bufferedReader match {
+ case ExecutionMode.REMOTE => // Remote mode
+ if (config.host.isEmpty || config.port.isEmpty) {
+ throw new IllegalArgumentException("<host> or <port> is not
specified!")
+ }
+ (config.host.get, config.port.get, None)
- case Some(br) =>
- val out = new StringWriter()
- new FlinkILoop(host, port, externalJars, bufferedReader, new
JPrintWriter(out))
+ case ExecutionMode.YARN => // YARN mode
+ config.yarnConfig match {
+ case Some(yarnConfig) => // if there is information for new
cluster
+ deployNewYarnCluster(yarnConfig)
+ case None => // there is no information for new cluster. Then we
use yarn properties.
+ fetchDeployedYarnClusterInfo()
+ }
- case None =>
- new FlinkILoop(host, port, externalJars)
- })
+ case ExecutionMode.UNDEFINED => // Wrong input
+ throw new IllegalArgumentException("please specify execution
mode:\n" +
+ "[local | remote <host> <port> | yarn]")
+ }
+ }
- val settings = new Settings()
+ def startShell(config: Config): Unit = {
+ println("Starting Flink Shell:")
- settings.usejavacp.value = true
- settings.Yreplsync.value = true
+ val (repl, cluster) = try {
+ val (host, port, cluster) = fetchConnectionInfo(config)
+ println(s"\nConnecting to Flink cluster (host: $host, port:
$port).\n")
+ val repl: Option[FlinkILoop] = bufferedReader match {
+ case Some(reader) =>
+ val out = new StringWriter()
+ Some(new FlinkILoop(host, port, config.externalJars, reader, new
JPrintWriter(out)))
+ case None =>
+ Some(new FlinkILoop(host, port, config.externalJars))
+ }
+
+ (repl, cluster)
+ } catch {
+ case e: IllegalArgumentException =>
+ println(s"Error: ${e.getMessage}")
+ sys.exit()
+ }
- // start scala interpreter shell
+ val settings = new Settings()
+ settings.usejavacp.value = true
+ settings.Yreplsync.value = true
+
+ try {
repl.foreach(_.process(settings))
} finally {
repl.foreach(_.closeInterpreter())
- cluster.foreach(_.stop())
+ cluster match {
+ case Some(Left(miniCluster)) => miniCluster.stop()
+ case Some(Right(yarnCluster)) => yarnCluster.shutdown(false)
+ case _ =>
+ }
+ }
+
+ println(" good bye ..")
+ }
+
+ def deployNewYarnCluster(yarnConfig: YarnConfig) = {
+ val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient
+
+ // use flink-dist.jar for scala shell
+ val jarPath = new Path("file://" +
+
s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}")
+ yarnClient.setLocalJarPath(jarPath)
+
+ // load configuration
+ val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv
+ val flinkConfiguration = GlobalConfiguration.getConfiguration
+ val confFile = new File(confDirPath + File.separator +
"flink-conf.yaml")
+ val confPath = new Path(confFile.getAbsolutePath)
+ GlobalConfiguration.loadConfiguration(confDirPath)
+ yarnClient.setFlinkConfigurationObject(flinkConfiguration)
+ yarnClient.setConfigurationDirectory(confDirPath)
+ yarnClient.setConfigurationFilePath(confPath)
+
+ // number of task managers is required.
+ yarnConfig.containers match {
+ case Some(containers) => yarnClient.setTaskManagerCount(containers)
+ case None =>
+ throw new IllegalArgumentException("Number of taskmanagers must be
specified.")
--- End diff --
Why is the number of yarn container an optional command line parameter if
we actually require it? Shouldn't we fail much earlier then? E.g. when the user
does not provide the number of containers.
> Allow scala shell to read yarn properties
> -----------------------------------------
>
> Key: FLINK-2935
> URL: https://issues.apache.org/jira/browse/FLINK-2935
> Project: Flink
> Issue Type: Improvement
> Components: Scala Shell
> Affects Versions: 0.9.1
> Reporter: Johannes
> Assignee: Chiwan Park
> Priority: Minor
> Labels: easyfix
>
> Currently the deployment of flink via yarn and the scala shell are not linked.
> When deploying a yarn session the file
> bq. org.apache.flink.client.CliFrontend
> creates a
> bq. .yarn-properties-$username
> file with the connection properties.
> There should be a way to have the scala shell automatically read this file if
> wanted as well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)