[ 
https://issues.apache.org/jira/browse/FLINK-2935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15105243#comment-15105243
 ] 

ASF GitHub Bot commented on FLINK-2935:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1500#discussion_r49995125
  
    --- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
    @@ -61,99 +73,217 @@ object FlinkShell {
             )
     
           cmd("remote") action { (_, c) =>
    -        c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
    +        c.copy(executionMode = ExecutionMode.REMOTE)
           } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
             arg[String]("<host>") action { (h, c) =>
    -          c.copy(host = h) }
    +          c.copy(host = Some(h)) }
               text("remote host name as string"),
             arg[Int]("<port>") action { (p, c) =>
    -          c.copy(port = p) }
    +          c.copy(port = Some(p)) }
               text("remote port as integer\n"),
             opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") 
action {
               case (x, c) =>
                 val xArray = x.split(":")
                 c.copy(externalJars = Option(xArray))
    -          } text("specifies additional jars to be used in Flink")
    +        } text ("specifies additional jars to be used in Flink")
           )
    -      help("help") abbr("h") text("prints this usage text\n")
    +
    +      cmd("yarn") action {
    +        (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
    +      } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
    +        opt[Int]("container") abbr ("n") valueName ("arg") action {
    +          (x, c) =>
    +            c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x))))
    +        } text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
    +        opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
    +          (x, c) =>
    +            c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
    +        } text ("Memory for JobManager Container [in MB]"),
    +        opt[String]("name") abbr ("nm") action {
    +          (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x))))
    +        } text ("Set a custom name for the application on YARN"),
    +        opt[String]("queue") abbr ("qu") valueName ("<arg>") action {
    +          (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x))))
    +        } text ("Specify YARN queue"),
    +        opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
    +          (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x))))
    +        } text ("Number of slots per TaskManager"),
    +        opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>") 
action {
    +          (x, c) =>
    +            c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
    +        } text ("Memory per TaskManager Container [in MB]"),
    +        opt[(String)] ("addclasspath") abbr("a") 
valueName("<path/to/jar>") action {
    +          case (x, c) =>
    +            val xArray = x.split(":")
    +            c.copy(externalJars = Option(xArray))
    +        } text("specifies additional jars to be used in Flink\n")
    +      )
    +
    +      help("help") abbr ("h") text ("prints this usage text\n")
         }
     
         // parse arguments
    -    parser.parse (args, Config()) match {
    -      case Some(config) =>
    -        startShell(config.host,
    -          config.port,
    -          config.flinkShellExecutionMode,
    -          config.externalJars)
    -
    -      case _ => System.out.println("Could not parse program arguments")
    +    parser.parse(args, Config()) match {
    +      case Some(config) => startShell(config)
    +      case _ => println("Could not parse program arguments")
         }
       }
     
    +  def fetchConnectionInfo(
    +    config: Config
    +  ): (String, Int, Option[Either[FlinkMiniCluster, 
AbstractFlinkYarnCluster]]) = {
    +    config.executionMode match {
    +      case ExecutionMode.LOCAL => // Local mode
    +        val config = new Configuration()
    +        config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
     
    -  def startShell(
    -      userHost: String,
    -      userPort: Int,
    -      executionMode: ExecutionMode.Value,
    -      externalJars: Option[Array[String]] = None): Unit ={
    -    
    -    System.out.println("Starting Flink Shell:")
    -
    -    // either port or userhost not specified by user, create new 
minicluster
    -    val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
    -      executionMode match {
    -        case ExecutionMode.LOCAL =>
    -          val config = new Configuration()
    -          config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
    -          val miniCluster = new LocalFlinkMiniCluster(config, false)
    -          miniCluster.start()
    -          val port = miniCluster.getLeaderRPCPort
    -          System.out.println(s"\nStarting local Flink cluster (host: 
localhost, port: $port).\n")
    -          ("localhost", port, Some(miniCluster))
    -
    -        case ExecutionMode.REMOTE =>
    -          if (userHost == "none" || userPort == -1) {
    -            System.out.println("Error: <host> or <port> not specified!")
    -            return
    -          } else {
    -            System.out.println(
    -              s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
    -            (userHost, userPort, None)
    -          }
    -
    -        case ExecutionMode.UNDEFINED =>
    -          System.out.println("Error: please specify execution mode:")
    -          System.out.println("[local | remote <host> <port>]")
    -          return
    -      }
    +        val miniCluster = new LocalFlinkMiniCluster(config, false)
    +        miniCluster.start()
     
    -    var repl: Option[FlinkILoop] = None
    +        println("\nStarting local Flink cluster (host: localhost, " +
    +          s"port: ${miniCluster.getLeaderRPCPort}).\n")
    +        ("localhost", miniCluster.getLeaderRPCPort, 
Some(Left(miniCluster)))
     
    -    try {
    -      // custom shell
    -      repl = Some(
    -        bufferedReader match {
    +      case ExecutionMode.REMOTE => // Remote mode
    +        if (config.host.isEmpty || config.port.isEmpty) {
    +          throw new IllegalArgumentException("<host> or <port> is not 
specified!")
    +        }
    +        (config.host.get, config.port.get, None)
     
    -          case Some(br) =>
    -            val out = new StringWriter()
    -            new FlinkILoop(host, port, externalJars, bufferedReader, new 
JPrintWriter(out))
    +      case ExecutionMode.YARN => // YARN mode
    +        config.yarnConfig match {
    +          case Some(yarnConfig) => // if there is information for new 
cluster
    +            deployNewYarnCluster(yarnConfig)
    +          case None => // there is no information for new cluster. Then we 
use yarn properties.
    +            fetchDeployedYarnClusterInfo()
    +        }
     
    -          case None =>
    -            new FlinkILoop(host, port, externalJars)
    -        })
    +      case ExecutionMode.UNDEFINED => // Wrong input
    +        throw new IllegalArgumentException("please specify execution 
mode:\n" +
    +          "[local | remote <host> <port> | yarn]")
    +    }
    +  }
     
    -      val settings = new Settings()
    +  def startShell(config: Config): Unit = {
    +    println("Starting Flink Shell:")
     
    -      settings.usejavacp.value = true
    -      settings.Yreplsync.value = true
    +    val (repl, cluster) = try {
    +      val (host, port, cluster) = fetchConnectionInfo(config)
    +      println(s"\nConnecting to Flink cluster (host: $host, port: 
$port).\n")
    +      val repl: Option[FlinkILoop] = bufferedReader match {
    +        case Some(reader) =>
    +          val out = new StringWriter()
    +          Some(new FlinkILoop(host, port, config.externalJars, reader, new 
JPrintWriter(out)))
    +        case None =>
    +          Some(new FlinkILoop(host, port, config.externalJars))
    +      }
    +
    +      (repl, cluster)
    +    } catch {
    +      case e: IllegalArgumentException =>
    +        println(s"Error: ${e.getMessage}")
    +        sys.exit()
    +    }
     
    -      // start scala interpreter shell
    +    val settings = new Settings()
    +    settings.usejavacp.value = true
    +    settings.Yreplsync.value = true
    +
    +    try {
           repl.foreach(_.process(settings))
         } finally {
           repl.foreach(_.closeInterpreter())
    -      cluster.foreach(_.stop())
    +      cluster match {
    +        case Some(Left(miniCluster)) => miniCluster.stop()
    +        case Some(Right(yarnCluster)) => yarnCluster.shutdown(false)
    +        case _ =>
    +      }
    +    }
    +
    +    println(" good bye ..")
    +  }
    +
    +  def deployNewYarnCluster(yarnConfig: YarnConfig) = {
    +    val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient
    +
    +    // use flink-dist.jar for scala shell
    +    val jarPath = new Path("file://" +
    +      
s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}")
    +    yarnClient.setLocalJarPath(jarPath)
    +
    +    // load configuration
    +    val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv
    +    val flinkConfiguration = GlobalConfiguration.getConfiguration
    +    val confFile = new File(confDirPath + File.separator + 
"flink-conf.yaml")
    +    val confPath = new Path(confFile.getAbsolutePath)
    +    GlobalConfiguration.loadConfiguration(confDirPath)
    +    yarnClient.setFlinkConfigurationObject(flinkConfiguration)
    +    yarnClient.setConfigurationDirectory(confDirPath)
    +    yarnClient.setConfigurationFilePath(confPath)
    +
    +    // number of task managers is required.
    +    yarnConfig.containers match {
    +      case Some(containers) => yarnClient.setTaskManagerCount(containers)
    +      case None =>
    +        throw new IllegalArgumentException("Number of taskmanagers must be 
specified.")
    +    }
    +
    +    // set configuration from user input
    +    yarnConfig.jobManagerMemory.foreach(yarnClient.setJobManagerMemory)
    +    yarnConfig.name.foreach(yarnClient.setName)
    +    yarnConfig.queue.foreach(yarnClient.setQueue)
    +    yarnConfig.slots.foreach(yarnClient.setTaskManagerSlots)
    +    yarnConfig.taskManagerMemory.foreach(yarnClient.setTaskManagerMemory)
    +
    +    // deploy
    +    val cluster = yarnClient.deploy()
    +    val address = cluster.getJobManagerAddress.getAddress.getHostAddress
    +    val port = cluster.getJobManagerAddress.getPort
    +    cluster.connectToCluster()
    +
    +    (address, port, Some(Right(cluster)))
    +  }
    +
    +  def fetchDeployedYarnClusterInfo() = {
    +    // load configuration
    +    val globalConfig = GlobalConfiguration.getConfiguration
    +    val defaultPropertiesLocation = System.getProperty("java.io.tmpdir")
    +    val currentUser = System.getProperty("user.name")
    +    val propertiesLocation = globalConfig.getString(
    +      ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesLocation)
    +    val propertiesName = CliFrontend.YARN_PROPERTIES_FILE + currentUser
    +    val propertiesFile = new File(propertiesLocation, propertiesName)
    +
    +    // read properties
    +    val properties = if (propertiesFile.exists()) {
    +      println("Found YARN properties file " + 
propertiesFile.getAbsolutePath)
    +      val properties = new Properties()
    +      val inputStream = new FileInputStream(propertiesFile)
    +
    +      try {
    +        properties.load(inputStream)
    +      } finally {
    +        inputStream.close()
    +      }
    +
    +      Some(properties)
    +    } else {
    +      None
         }
     
    -    System.out.println(" good bye ..")
    +    properties match {
    +      case Some(props) =>
    +        val addressInStr = 
props.getProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY)
    +        val address = ClientUtils.parseHostPortAddress(addressInStr)
    +
    +        (address.getHostString, address.getPort, None)
    +      case None =>
    +        throw new IllegalArgumentException("Scala Shell cannot fetch YARN 
properties.")
    --- End diff --
    
    Shouldn't we fail in the preceding `else` branch? Then we could also move 
the `Some` case in the `if` branch.


> Allow scala shell to read yarn properties
> -----------------------------------------
>
>                 Key: FLINK-2935
>                 URL: https://issues.apache.org/jira/browse/FLINK-2935
>             Project: Flink
>          Issue Type: Improvement
>          Components: Scala Shell
>    Affects Versions: 0.9.1
>            Reporter: Johannes
>            Assignee: Chiwan Park
>            Priority: Minor
>              Labels: easyfix
>
> Currently the deployment of flink via yarn and the scala shell are not linked.
> When deploying a yarn session the file
> bq. org.apache.flink.client.CliFrontend
> creates a 
> bq. .yarn-properties-$username
> file with the connection properties.
> There should be a way to have the scala shell automatically read this file if 
> wanted as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to