Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1500#discussion_r49994979
--- Diff:
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
---
@@ -61,99 +73,217 @@ object FlinkShell {
)
cmd("remote") action { (_, c) =>
- c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+ c.copy(executionMode = ExecutionMode.REMOTE)
} text("starts Flink scala shell connecting to a remote cluster\n")
children(
arg[String]("<host>") action { (h, c) =>
- c.copy(host = h) }
+ c.copy(host = Some(h)) }
text("remote host name as string"),
arg[Int]("<port>") action { (p, c) =>
- c.copy(port = p) }
+ c.copy(port = Some(p)) }
text("remote port as integer\n"),
opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>")
action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
- } text("specifies additional jars to be used in Flink")
+ } text ("specifies additional jars to be used in Flink")
)
- help("help") abbr("h") text("prints this usage text\n")
+
+ cmd("yarn") action {
+ (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig =
None)
+ } text ("starts Flink scala shell connecting to a yarn cluster\n")
children(
+ opt[Int]("container") abbr ("n") valueName ("arg") action {
+ (x, c) =>
+ c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers =
Some(x))))
+ } text ("Number of YARN container to allocate (= Number of Task
Managers)"),
+ opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+ (x, c) =>
+ c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
+ } text ("Memory for JobManager Container [in MB]"),
+ opt[String]("name") abbr ("nm") action {
+ (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name
= Some(x))))
+ } text ("Set a custom name for the application on YARN"),
+ opt[String]("queue") abbr ("qu") valueName ("<arg>") action {
+ (x, c) => c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(queue = Some(x))))
+ } text ("Specify YARN queue"),
+ opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
+ (x, c) => c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(slots = Some(x))))
+ } text ("Number of slots per TaskManager"),
+ opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>")
action {
+ (x, c) =>
+ c.copy(yarnConfig =
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
+ } text ("Memory per TaskManager Container [in MB]"),
+ opt[(String)] ("addclasspath") abbr("a")
valueName("<path/to/jar>") action {
+ case (x, c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink\n")
+ )
+
+ help("help") abbr ("h") text ("prints this usage text\n")
}
// parse arguments
- parser.parse (args, Config()) match {
- case Some(config) =>
- startShell(config.host,
- config.port,
- config.flinkShellExecutionMode,
- config.externalJars)
-
- case _ => System.out.println("Could not parse program arguments")
+ parser.parse(args, Config()) match {
+ case Some(config) => startShell(config)
+ case _ => println("Could not parse program arguments")
}
}
+ def fetchConnectionInfo(
+ config: Config
+ ): (String, Int, Option[Either[FlinkMiniCluster,
AbstractFlinkYarnCluster]]) = {
+ config.executionMode match {
+ case ExecutionMode.LOCAL => // Local mode
+ val config = new Configuration()
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- def startShell(
- userHost: String,
- userPort: Int,
- executionMode: ExecutionMode.Value,
- externalJars: Option[Array[String]] = None): Unit ={
-
- System.out.println("Starting Flink Shell:")
-
- // either port or userhost not specified by user, create new
minicluster
- val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
- executionMode match {
- case ExecutionMode.LOCAL =>
- val config = new Configuration()
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- val miniCluster = new LocalFlinkMiniCluster(config, false)
- miniCluster.start()
- val port = miniCluster.getLeaderRPCPort
- System.out.println(s"\nStarting local Flink cluster (host:
localhost, port: $port).\n")
- ("localhost", port, Some(miniCluster))
-
- case ExecutionMode.REMOTE =>
- if (userHost == "none" || userPort == -1) {
- System.out.println("Error: <host> or <port> not specified!")
- return
- } else {
- System.out.println(
- s"\nConnecting to Flink cluster (host: $userHost, port:
$userPort).\n")
- (userHost, userPort, None)
- }
-
- case ExecutionMode.UNDEFINED =>
- System.out.println("Error: please specify execution mode:")
- System.out.println("[local | remote <host> <port>]")
- return
- }
+ val miniCluster = new LocalFlinkMiniCluster(config, false)
+ miniCluster.start()
- var repl: Option[FlinkILoop] = None
+ println("\nStarting local Flink cluster (host: localhost, " +
+ s"port: ${miniCluster.getLeaderRPCPort}).\n")
+ ("localhost", miniCluster.getLeaderRPCPort,
Some(Left(miniCluster)))
- try {
- // custom shell
- repl = Some(
- bufferedReader match {
+ case ExecutionMode.REMOTE => // Remote mode
+ if (config.host.isEmpty || config.port.isEmpty) {
+ throw new IllegalArgumentException("<host> or <port> is not
specified!")
+ }
+ (config.host.get, config.port.get, None)
- case Some(br) =>
- val out = new StringWriter()
- new FlinkILoop(host, port, externalJars, bufferedReader, new
JPrintWriter(out))
+ case ExecutionMode.YARN => // YARN mode
+ config.yarnConfig match {
+ case Some(yarnConfig) => // if there is information for new
cluster
+ deployNewYarnCluster(yarnConfig)
+ case None => // there is no information for new cluster. Then we
use yarn properties.
+ fetchDeployedYarnClusterInfo()
+ }
- case None =>
- new FlinkILoop(host, port, externalJars)
- })
+ case ExecutionMode.UNDEFINED => // Wrong input
+ throw new IllegalArgumentException("please specify execution
mode:\n" +
+ "[local | remote <host> <port> | yarn]")
+ }
+ }
- val settings = new Settings()
+ def startShell(config: Config): Unit = {
+ println("Starting Flink Shell:")
- settings.usejavacp.value = true
- settings.Yreplsync.value = true
+ val (repl, cluster) = try {
+ val (host, port, cluster) = fetchConnectionInfo(config)
+ println(s"\nConnecting to Flink cluster (host: $host, port:
$port).\n")
+ val repl: Option[FlinkILoop] = bufferedReader match {
+ case Some(reader) =>
+ val out = new StringWriter()
+ Some(new FlinkILoop(host, port, config.externalJars, reader, new
JPrintWriter(out)))
+ case None =>
+ Some(new FlinkILoop(host, port, config.externalJars))
+ }
+
+ (repl, cluster)
+ } catch {
+ case e: IllegalArgumentException =>
+ println(s"Error: ${e.getMessage}")
+ sys.exit()
+ }
- // start scala interpreter shell
+ val settings = new Settings()
+ settings.usejavacp.value = true
+ settings.Yreplsync.value = true
+
+ try {
repl.foreach(_.process(settings))
} finally {
repl.foreach(_.closeInterpreter())
- cluster.foreach(_.stop())
+ cluster match {
+ case Some(Left(miniCluster)) => miniCluster.stop()
+ case Some(Right(yarnCluster)) => yarnCluster.shutdown(false)
+ case _ =>
+ }
+ }
+
+ println(" good bye ..")
+ }
+
+ def deployNewYarnCluster(yarnConfig: YarnConfig) = {
+ val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient
+
+ // use flink-dist.jar for scala shell
+ val jarPath = new Path("file://" +
+
s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}")
+ yarnClient.setLocalJarPath(jarPath)
+
+ // load configuration
+ val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv
+ val flinkConfiguration = GlobalConfiguration.getConfiguration
+ val confFile = new File(confDirPath + File.separator +
"flink-conf.yaml")
+ val confPath = new Path(confFile.getAbsolutePath)
+ GlobalConfiguration.loadConfiguration(confDirPath)
+ yarnClient.setFlinkConfigurationObject(flinkConfiguration)
+ yarnClient.setConfigurationDirectory(confDirPath)
+ yarnClient.setConfigurationFilePath(confPath)
+
+ // number of task managers is required.
+ yarnConfig.containers match {
+ case Some(containers) => yarnClient.setTaskManagerCount(containers)
+ case None =>
+ throw new IllegalArgumentException("Number of taskmanagers must be
specified.")
+ }
+
+ // set configuration from user input
+ yarnConfig.jobManagerMemory.foreach(yarnClient.setJobManagerMemory)
+ yarnConfig.name.foreach(yarnClient.setName)
+ yarnConfig.queue.foreach(yarnClient.setQueue)
+ yarnConfig.slots.foreach(yarnClient.setTaskManagerSlots)
+ yarnConfig.taskManagerMemory.foreach(yarnClient.setTaskManagerMemory)
+
+ // deploy
+ val cluster = yarnClient.deploy()
+ val address = cluster.getJobManagerAddress.getAddress.getHostAddress
+ val port = cluster.getJobManagerAddress.getPort
+ cluster.connectToCluster()
+
+ (address, port, Some(Right(cluster)))
+ }
+
+ def fetchDeployedYarnClusterInfo() = {
+ // load configuration
+ val globalConfig = GlobalConfiguration.getConfiguration
+ val defaultPropertiesLocation = System.getProperty("java.io.tmpdir")
+ val currentUser = System.getProperty("user.name")
+ val propertiesLocation = globalConfig.getString(
+ ConfigConstants.YARN_PROPERTIES_FILE_LOCATION,
defaultPropertiesLocation)
+ val propertiesName = CliFrontend.YARN_PROPERTIES_FILE + currentUser
+ val propertiesFile = new File(propertiesLocation, propertiesName)
--- End diff --
Maybe we could unify the `propertiesName` generation somewhere. It is also
implemented in `ExecutionEnvironment`. If it changes at one place, then one has
to remember that it also has to be changed here. This will most definitely be
forgotten.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---