Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1500#discussion_r49995125
  
    --- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
    @@ -61,99 +73,217 @@ object FlinkShell {
             )
     
           cmd("remote") action { (_, c) =>
    -        c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
    +        c.copy(executionMode = ExecutionMode.REMOTE)
           } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
             arg[String]("<host>") action { (h, c) =>
    -          c.copy(host = h) }
    +          c.copy(host = Some(h)) }
               text("remote host name as string"),
             arg[Int]("<port>") action { (p, c) =>
    -          c.copy(port = p) }
    +          c.copy(port = Some(p)) }
               text("remote port as integer\n"),
             opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") 
action {
               case (x, c) =>
                 val xArray = x.split(":")
                 c.copy(externalJars = Option(xArray))
    -          } text("specifies additional jars to be used in Flink")
    +        } text ("specifies additional jars to be used in Flink")
           )
    -      help("help") abbr("h") text("prints this usage text\n")
    +
    +      cmd("yarn") action {
    +        (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
    +      } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
    +        opt[Int]("container") abbr ("n") valueName ("arg") action {
    +          (x, c) =>
    +            c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x))))
    +        } text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
    +        opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
    +          (x, c) =>
    +            c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
    +        } text ("Memory for JobManager Container [in MB]"),
    +        opt[String]("name") abbr ("nm") action {
    +          (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x))))
    +        } text ("Set a custom name for the application on YARN"),
    +        opt[String]("queue") abbr ("qu") valueName ("<arg>") action {
    +          (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x))))
    +        } text ("Specify YARN queue"),
    +        opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
    +          (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x))))
    +        } text ("Number of slots per TaskManager"),
    +        opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>") 
action {
    +          (x, c) =>
    +            c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
    +        } text ("Memory per TaskManager Container [in MB]"),
    +        opt[(String)] ("addclasspath") abbr("a") 
valueName("<path/to/jar>") action {
    +          case (x, c) =>
    +            val xArray = x.split(":")
    +            c.copy(externalJars = Option(xArray))
    +        } text("specifies additional jars to be used in Flink\n")
    +      )
    +
    +      help("help") abbr ("h") text ("prints this usage text\n")
         }
     
         // parse arguments
    -    parser.parse (args, Config()) match {
    -      case Some(config) =>
    -        startShell(config.host,
    -          config.port,
    -          config.flinkShellExecutionMode,
    -          config.externalJars)
    -
    -      case _ => System.out.println("Could not parse program arguments")
    +    parser.parse(args, Config()) match {
    +      case Some(config) => startShell(config)
    +      case _ => println("Could not parse program arguments")
         }
       }
     
    +  def fetchConnectionInfo(
    +    config: Config
    +  ): (String, Int, Option[Either[FlinkMiniCluster, 
AbstractFlinkYarnCluster]]) = {
    +    config.executionMode match {
    +      case ExecutionMode.LOCAL => // Local mode
    +        val config = new Configuration()
    +        config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
     
    -  def startShell(
    -      userHost: String,
    -      userPort: Int,
    -      executionMode: ExecutionMode.Value,
    -      externalJars: Option[Array[String]] = None): Unit ={
    -    
    -    System.out.println("Starting Flink Shell:")
    -
    -    // either port or userhost not specified by user, create new 
minicluster
    -    val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
    -      executionMode match {
    -        case ExecutionMode.LOCAL =>
    -          val config = new Configuration()
    -          config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
    -          val miniCluster = new LocalFlinkMiniCluster(config, false)
    -          miniCluster.start()
    -          val port = miniCluster.getLeaderRPCPort
    -          System.out.println(s"\nStarting local Flink cluster (host: 
localhost, port: $port).\n")
    -          ("localhost", port, Some(miniCluster))
    -
    -        case ExecutionMode.REMOTE =>
    -          if (userHost == "none" || userPort == -1) {
    -            System.out.println("Error: <host> or <port> not specified!")
    -            return
    -          } else {
    -            System.out.println(
    -              s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
    -            (userHost, userPort, None)
    -          }
    -
    -        case ExecutionMode.UNDEFINED =>
    -          System.out.println("Error: please specify execution mode:")
    -          System.out.println("[local | remote <host> <port>]")
    -          return
    -      }
    +        val miniCluster = new LocalFlinkMiniCluster(config, false)
    +        miniCluster.start()
     
    -    var repl: Option[FlinkILoop] = None
    +        println("\nStarting local Flink cluster (host: localhost, " +
    +          s"port: ${miniCluster.getLeaderRPCPort}).\n")
    +        ("localhost", miniCluster.getLeaderRPCPort, 
Some(Left(miniCluster)))
     
    -    try {
    -      // custom shell
    -      repl = Some(
    -        bufferedReader match {
    +      case ExecutionMode.REMOTE => // Remote mode
    +        if (config.host.isEmpty || config.port.isEmpty) {
    +          throw new IllegalArgumentException("<host> or <port> is not 
specified!")
    +        }
    +        (config.host.get, config.port.get, None)
     
    -          case Some(br) =>
    -            val out = new StringWriter()
    -            new FlinkILoop(host, port, externalJars, bufferedReader, new 
JPrintWriter(out))
    +      case ExecutionMode.YARN => // YARN mode
    +        config.yarnConfig match {
    +          case Some(yarnConfig) => // if there is information for new 
cluster
    +            deployNewYarnCluster(yarnConfig)
    +          case None => // there is no information for new cluster. Then we 
use yarn properties.
    +            fetchDeployedYarnClusterInfo()
    +        }
     
    -          case None =>
    -            new FlinkILoop(host, port, externalJars)
    -        })
    +      case ExecutionMode.UNDEFINED => // Wrong input
    +        throw new IllegalArgumentException("please specify execution 
mode:\n" +
    +          "[local | remote <host> <port> | yarn]")
    +    }
    +  }
     
    -      val settings = new Settings()
    +  def startShell(config: Config): Unit = {
    +    println("Starting Flink Shell:")
     
    -      settings.usejavacp.value = true
    -      settings.Yreplsync.value = true
    +    val (repl, cluster) = try {
    +      val (host, port, cluster) = fetchConnectionInfo(config)
    +      println(s"\nConnecting to Flink cluster (host: $host, port: 
$port).\n")
    +      val repl: Option[FlinkILoop] = bufferedReader match {
    +        case Some(reader) =>
    +          val out = new StringWriter()
    +          Some(new FlinkILoop(host, port, config.externalJars, reader, new 
JPrintWriter(out)))
    +        case None =>
    +          Some(new FlinkILoop(host, port, config.externalJars))
    +      }
    +
    +      (repl, cluster)
    +    } catch {
    +      case e: IllegalArgumentException =>
    +        println(s"Error: ${e.getMessage}")
    +        sys.exit()
    +    }
     
    -      // start scala interpreter shell
    +    val settings = new Settings()
    +    settings.usejavacp.value = true
    +    settings.Yreplsync.value = true
    +
    +    try {
           repl.foreach(_.process(settings))
         } finally {
           repl.foreach(_.closeInterpreter())
    -      cluster.foreach(_.stop())
    +      cluster match {
    +        case Some(Left(miniCluster)) => miniCluster.stop()
    +        case Some(Right(yarnCluster)) => yarnCluster.shutdown(false)
    +        case _ =>
    +      }
    +    }
    +
    +    println(" good bye ..")
    +  }
    +
    +  def deployNewYarnCluster(yarnConfig: YarnConfig) = {
    +    val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient
    +
    +    // use flink-dist.jar for scala shell
    +    val jarPath = new Path("file://" +
    +      
s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}")
    +    yarnClient.setLocalJarPath(jarPath)
    +
    +    // load configuration
    +    val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv
    +    val flinkConfiguration = GlobalConfiguration.getConfiguration
    +    val confFile = new File(confDirPath + File.separator + 
"flink-conf.yaml")
    +    val confPath = new Path(confFile.getAbsolutePath)
    +    GlobalConfiguration.loadConfiguration(confDirPath)
    +    yarnClient.setFlinkConfigurationObject(flinkConfiguration)
    +    yarnClient.setConfigurationDirectory(confDirPath)
    +    yarnClient.setConfigurationFilePath(confPath)
    +
    +    // number of task managers is required.
    +    yarnConfig.containers match {
    +      case Some(containers) => yarnClient.setTaskManagerCount(containers)
    +      case None =>
    +        throw new IllegalArgumentException("Number of taskmanagers must be 
specified.")
    +    }
    +
    +    // set configuration from user input
    +    yarnConfig.jobManagerMemory.foreach(yarnClient.setJobManagerMemory)
    +    yarnConfig.name.foreach(yarnClient.setName)
    +    yarnConfig.queue.foreach(yarnClient.setQueue)
    +    yarnConfig.slots.foreach(yarnClient.setTaskManagerSlots)
    +    yarnConfig.taskManagerMemory.foreach(yarnClient.setTaskManagerMemory)
    +
    +    // deploy
    +    val cluster = yarnClient.deploy()
    +    val address = cluster.getJobManagerAddress.getAddress.getHostAddress
    +    val port = cluster.getJobManagerAddress.getPort
    +    cluster.connectToCluster()
    +
    +    (address, port, Some(Right(cluster)))
    +  }
    +
    +  def fetchDeployedYarnClusterInfo() = {
    +    // load configuration
    +    val globalConfig = GlobalConfiguration.getConfiguration
    +    val defaultPropertiesLocation = System.getProperty("java.io.tmpdir")
    +    val currentUser = System.getProperty("user.name")
    +    val propertiesLocation = globalConfig.getString(
    +      ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesLocation)
    +    val propertiesName = CliFrontend.YARN_PROPERTIES_FILE + currentUser
    +    val propertiesFile = new File(propertiesLocation, propertiesName)
    +
    +    // read properties
    +    val properties = if (propertiesFile.exists()) {
    +      println("Found YARN properties file " + 
propertiesFile.getAbsolutePath)
    +      val properties = new Properties()
    +      val inputStream = new FileInputStream(propertiesFile)
    +
    +      try {
    +        properties.load(inputStream)
    +      } finally {
    +        inputStream.close()
    +      }
    +
    +      Some(properties)
    +    } else {
    +      None
         }
     
    -    System.out.println(" good bye ..")
    +    properties match {
    +      case Some(props) =>
    +        val addressInStr = 
props.getProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY)
    +        val address = ClientUtils.parseHostPortAddress(addressInStr)
    +
    +        (address.getHostString, address.getPort, None)
    +      case None =>
    +        throw new IllegalArgumentException("Scala Shell cannot fetch YARN 
properties.")
    --- End diff --
    
    Shouldn't we fail in the preceding `else` branch? Then we could also move 
the `Some` case in the `if` branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to