[
https://issues.apache.org/jira/browse/FLINK-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15058140#comment-15058140
]
ASF GitHub Bot commented on FLINK-3074:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/1416#discussion_r47642522
--- Diff:
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
@@ -120,18 +125,79 @@ abstract class ApplicationMasterBase {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
}
- val (actorSystem, jmActor, archiveActor, webMonitor) =
+ // we try to start the JobManager actor system using the port
definition
+ // from the config.
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ val amPortRange: String =
config.getString(ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+ ConfigConstants.DEFAULT_YARN_APPLICATION_MASTER_PORT)
+ val portsIterator = NetUtils.getPortRangeFromString(amPortRange)
+
+ // method to start the actor system.
+ def startActorSystem(
+ portsIterator: java.util.Iterator[Integer])
+ : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = {
+ val availableSocket = NetUtils.createSocketFromPorts(
+ portsIterator,
+ new NetUtils.SocketFactory {
+ override def createSocket(port: Int): ServerSocket = new
ServerSocket(port)
+ })
+
+ // get port as integer and close socket
+ val tryPort = if (availableSocket == null) {
+ throw new BindException(s"Unable to allocate port for
ApplicationMaster in " +
+ s"specified port range: $amPortRange ")
+ } else {
+ val port = availableSocket.getLocalPort
+ availableSocket.close()
+ port // return for if
+ }
+
JobManager.startActorSystemAndJobManagerActors(
config,
JobManagerMode.CLUSTER,
streamingMode,
ownHostname,
- 0,
+ tryPort,
getJobManagerClass,
getArchivistClass
)
+ }
+
+ @tailrec
+ def retry[T](fn: => T, stopCond: => Boolean): Try[T] = {
+ Try {
+ fn
+ } match {
+ case Failure(x: BindException) =>
+ if (stopCond) {
+ Failure(new RuntimeException("Unable to do further retries
starting the actor " +
+ "system"))
+ } else {
+ retry(fn, stopCond)
+ }
+ case Failure(x: Exception) => x.getCause match {
+ case c: ChannelException =>
+ if (stopCond) {
+ Failure(new RuntimeException("Unable to do further retries
starting the actor " +
+ "system"))
+ } else {
+ retry(fn, stopCond)
+ }
+ case _ => Failure(x)
+ }
+ case f => f
+ }
+ }
+
+ // try starting the actor system
+ val result = retry(startActorSystem(portsIterator),
{portsIterator.hasNext})
--- End diff --
The stop condition is wrong. You stop when there are more available ports
to check, but you should stop when there are none available any more.
> Make ApplicationMaster/JobManager akka port configurable
> --------------------------------------------------------
>
> Key: FLINK-3074
> URL: https://issues.apache.org/jira/browse/FLINK-3074
> Project: Flink
> Issue Type: Improvement
> Components: YARN Client
> Reporter: Robert Metzger
> Assignee: Robert Metzger
> Fix For: 1.0.0
>
>
> Similar to the BlobServer, the YARN ApplicationMaster should allow starting
> it on a specified list or range of ports.
> In cases where only certain ports are allowed by a firewall, users can
> specify a range of ports where they want the AM to allocate its RPC port
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)