[
https://issues.apache.org/jira/browse/FLINK-2935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15105236#comment-15105236
]
ASF GitHub Bot commented on FLINK-2935:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1500#discussion_r49994415
--- Diff:
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
---
@@ -61,99 +73,217 @@ object FlinkShell {
)
cmd("remote") action { (_, c) =>
- c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+ c.copy(executionMode = ExecutionMode.REMOTE)
} text("starts Flink scala shell connecting to a remote cluster\n")
children(
arg[String]("<host>") action { (h, c) =>
- c.copy(host = h) }
+ c.copy(host = Some(h)) }
text("remote host name as string"),
arg[Int]("<port>") action { (p, c) =>
- c.copy(port = p) }
+ c.copy(port = Some(p)) }
text("remote port as integer\n"),
opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>")
action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
- } text("specifies additional jars to be used in Flink")
+ } text ("specifies additional jars to be used in Flink")
)
- help("help") abbr("h") text("prints this usage text\n")
+
+ cmd("yarn") action {
+ (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig =
None)
+ } text ("starts Flink scala shell connecting to a yarn cluster\n")
children(
+ opt[Int]("container") abbr ("n") valueName ("arg") action {
+ (x, c) =>
+ c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers =
Some(x))))
+ } text ("Number of YARN container to allocate (= Number of Task
Managers)"),
+ opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+ (x, c) =>
+ c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
+ } text ("Memory for JobManager Container [in MB]"),
+ opt[String]("name") abbr ("nm") action {
+ (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name
= Some(x))))
+ } text ("Set a custom name for the application on YARN"),
+ opt[String]("queue") abbr ("qu") valueName ("<arg>") action {
+ (x, c) => c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(queue = Some(x))))
+ } text ("Specify YARN queue"),
+ opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
+ (x, c) => c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(slots = Some(x))))
+ } text ("Number of slots per TaskManager"),
+ opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>")
action {
+ (x, c) =>
+ c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
+ } text ("Memory per TaskManager Container [in MB]"),
+ opt[(String)] ("addclasspath") abbr("a")
valueName("<path/to/jar>") action {
+ case (x, c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink\n")
+ )
+
+ help("help") abbr ("h") text ("prints this usage text\n")
}
// parse arguments
- parser.parse (args, Config()) match {
- case Some(config) =>
- startShell(config.host,
- config.port,
- config.flinkShellExecutionMode,
- config.externalJars)
-
- case _ => System.out.println("Could not parse program arguments")
+ parser.parse(args, Config()) match {
+ case Some(config) => startShell(config)
+ case _ => println("Could not parse program arguments")
}
}
+ def fetchConnectionInfo(
+ config: Config
+ ): (String, Int, Option[Either[FlinkMiniCluster,
AbstractFlinkYarnCluster]]) = {
+ config.executionMode match {
+ case ExecutionMode.LOCAL => // Local mode
+ val config = new Configuration()
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- def startShell(
- userHost: String,
- userPort: Int,
- executionMode: ExecutionMode.Value,
- externalJars: Option[Array[String]] = None): Unit ={
-
- System.out.println("Starting Flink Shell:")
-
- // either port or userhost not specified by user, create new
minicluster
- val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
- executionMode match {
- case ExecutionMode.LOCAL =>
- val config = new Configuration()
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- val miniCluster = new LocalFlinkMiniCluster(config, false)
- miniCluster.start()
- val port = miniCluster.getLeaderRPCPort
- System.out.println(s"\nStarting local Flink cluster (host:
localhost, port: $port).\n")
- ("localhost", port, Some(miniCluster))
-
- case ExecutionMode.REMOTE =>
- if (userHost == "none" || userPort == -1) {
- System.out.println("Error: <host> or <port> not specified!")
- return
- } else {
- System.out.println(
- s"\nConnecting to Flink cluster (host: $userHost, port:
$userPort).\n")
- (userHost, userPort, None)
- }
-
- case ExecutionMode.UNDEFINED =>
- System.out.println("Error: please specify execution mode:")
- System.out.println("[local | remote <host> <port>]")
- return
- }
+ val miniCluster = new LocalFlinkMiniCluster(config, false)
+ miniCluster.start()
- var repl: Option[FlinkILoop] = None
+ println("\nStarting local Flink cluster (host: localhost, " +
+ s"port: ${miniCluster.getLeaderRPCPort}).\n")
+ ("localhost", miniCluster.getLeaderRPCPort,
Some(Left(miniCluster)))
- try {
- // custom shell
- repl = Some(
- bufferedReader match {
+ case ExecutionMode.REMOTE => // Remote mode
+ if (config.host.isEmpty || config.port.isEmpty) {
+ throw new IllegalArgumentException("<host> or <port> is not
specified!")
+ }
+ (config.host.get, config.port.get, None)
- case Some(br) =>
- val out = new StringWriter()
- new FlinkILoop(host, port, externalJars, bufferedReader, new
JPrintWriter(out))
+ case ExecutionMode.YARN => // YARN mode
+ config.yarnConfig match {
+ case Some(yarnConfig) => // if there is information for new
cluster
+ deployNewYarnCluster(yarnConfig)
+ case None => // there is no information for new cluster. Then we
use yarn properties.
+ fetchDeployedYarnClusterInfo()
+ }
- case None =>
- new FlinkILoop(host, port, externalJars)
- })
+ case ExecutionMode.UNDEFINED => // Wrong input
+ throw new IllegalArgumentException("please specify execution
mode:\n" +
+ "[local | remote <host> <port> | yarn]")
+ }
+ }
- val settings = new Settings()
+ def startShell(config: Config): Unit = {
+ println("Starting Flink Shell:")
- settings.usejavacp.value = true
- settings.Yreplsync.value = true
+ val (repl, cluster) = try {
+ val (host, port, cluster) = fetchConnectionInfo(config)
+ println(s"\nConnecting to Flink cluster (host: $host, port:
$port).\n")
+ val repl: Option[FlinkILoop] = bufferedReader match {
--- End diff --
Why is this an `Option`? All cases return a `FlinkILoop`.
> Allow scala shell to read yarn properties
> -----------------------------------------
>
> Key: FLINK-2935
> URL: https://issues.apache.org/jira/browse/FLINK-2935
> Project: Flink
> Issue Type: Improvement
> Components: Scala Shell
> Affects Versions: 0.9.1
> Reporter: Johannes
> Assignee: Chiwan Park
> Priority: Minor
> Labels: easyfix
>
> Currently the deployment of flink via yarn and the scala shell are not linked.
> When deploying a yarn session the file
> bq. org.apache.flink.client.CliFrontend
> creates a
> bq. .yarn-properties-$username
> file with the connection properties.
> There should be a way to have the scala shell automatically read this file if
> wanted as well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)