[ 
https://issues.apache.org/jira/browse/FLINK-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15058140#comment-15058140
 ] 

ASF GitHub Bot commented on FLINK-3074:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1416#discussion_r47642522
  
    --- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
    @@ -120,18 +125,79 @@ abstract class ApplicationMasterBase {
             config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
           }
     
    -      val (actorSystem, jmActor, archiveActor, webMonitor) =
    +      // we try to start the JobManager actor system using the port 
definition
    +      // from the config.
    +      // first, we check if the port is available by opening a socket
    +      // if the actor system fails to start on the port, we try further
    +      val amPortRange: String = 
config.getString(ConfigConstants.YARN_APPLICATION_MASTER_PORT,
    +        ConfigConstants.DEFAULT_YARN_APPLICATION_MASTER_PORT)
    +      val portsIterator = NetUtils.getPortRangeFromString(amPortRange)
    +
    +      // method to start the actor system.
    +      def startActorSystem(
    +          portsIterator: java.util.Iterator[Integer])
    +        : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = {
    +        val availableSocket = NetUtils.createSocketFromPorts(
    +          portsIterator,
    +          new NetUtils.SocketFactory {
    +            override def createSocket(port: Int): ServerSocket = new 
ServerSocket(port)
    +          })
    +
    +        // get port as integer and close socket
    +       val tryPort = if (availableSocket == null) {
    +          throw new BindException(s"Unable to allocate port for 
ApplicationMaster in " +
    +            s"specified port range: $amPortRange ")
    +        } else {
    +          val port = availableSocket.getLocalPort
    +          availableSocket.close()
    +          port // return for if
    +        }
    +
             JobManager.startActorSystemAndJobManagerActors(
               config,
               JobManagerMode.CLUSTER,
               streamingMode,
               ownHostname,
    -          0,
    +          tryPort,
               getJobManagerClass,
               getArchivistClass
             )
    +      }
    +
    +      @tailrec
    +      def retry[T](fn: => T, stopCond: => Boolean): Try[T] = {
    +        Try {
    +          fn
    +        } match {
    +          case Failure(x: BindException) =>
    +            if (stopCond) {
    +              Failure(new RuntimeException("Unable to do further retries 
starting the actor " +
    +                "system"))
    +            } else {
    +              retry(fn, stopCond)
    +            }
    +          case Failure(x: Exception) => x.getCause match {
    +            case c: ChannelException =>
    +              if (stopCond) {
    +                Failure(new RuntimeException("Unable to do further retries 
starting the actor " +
    +                  "system"))
    +              } else {
    +                retry(fn, stopCond)
    +              }
    +            case _ => Failure(x)
    +          }
    +          case f => f
    +        }
    +      }
    +
    +      // try starting the actor system
    +      val result = retry(startActorSystem(portsIterator), 
{portsIterator.hasNext})
    --- End diff --
    
    The stop condition is wrong. You stop when there are more available ports 
to check, but you should stop when there are none available any more.


> Make ApplicationMaster/JobManager akka port configurable
> --------------------------------------------------------
>
>                 Key: FLINK-3074
>                 URL: https://issues.apache.org/jira/browse/FLINK-3074
>             Project: Flink
>          Issue Type: Improvement
>          Components: YARN Client
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>             Fix For: 1.0.0
>
>
> Similar to the BlobServer, the YARN ApplicationMaster should allow starting 
> it on a specified list or range of ports.
> In cases where only certain ports are allowed by a firewall, users can 
> specify a range of ports where they want the AM to allocate its RPC port



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to