[
https://issues.apache.org/jira/browse/FLINK-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15044945#comment-15044945
]
ASF GitHub Bot commented on FLINK-3074:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1416#discussion_r46821027
--- Diff:
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
@@ -120,16 +122,73 @@ abstract class ApplicationMasterBase {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
}
- val (actorSystem, jmActor, archiveActor, webMonitor) =
- JobManager.startActorSystemAndJobManagerActors(
- config,
- JobManagerMode.CLUSTER,
- streamingMode,
- ownHostname,
- 0,
- getJobManagerClass,
- getArchivistClass
- )
+ // we try to start the JobManager actor system using the port
definition
+ // from the config.
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ val amPortRange: String =
config.getString(ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+ ConfigConstants.DEFAULT_YARN_APPLICATION_MASTER_PORT)
+ val portsSet = NetUtils.getPortRangeFromString(amPortRange)
+
+ // Try to start the actor system with the given set of ports:
+ var startActorResultOption: Option[(ActorSystem, ActorRef, ActorRef,
Option[WebMonitor])] =
+ None
+
+ while (startActorResultOption.isEmpty && portsSet.size() > 0) {
--- End diff --
We could also do something like
```
def startActorSystem(portsSet: Set[Integer]): (ActorSystem, ActorRef,
ActorRef, Option[WebMonitor]) = {
val availableSocket = NetUtils.createSocketFromPorts(portsSet, new
NetUtils.SocketFactory {
override def createSocket(port: Int): ServerSocket = new
ServerSocket(port)}, log)
// get port as integer and close socket
val tryPort = if (availableSocket == null) {
throw new RuntimeException(s"Unable to allocate port for
ApplicationMaster in " +
s"specified port range: $amPortRange ")
} else {
val port = availableSocket.getLocalPort
availableSocket.close()
port // return for if
}
JobManager.startActorSystemAndJobManagerActors(
config,
JobManager.CLUSTER,
streamingMode,
ownHostname,
tryPort,
getJobManagerClass,
getArchivistClass
)
}
@tailrec
def retry[T](fn: => T): Try[T] = {
Try{ fn } match {
case Failure(x: BindException) => retry(fn)
case f => f
}
}
```
> Make ApplicationMaster/JobManager akka port configurable
> --------------------------------------------------------
>
> Key: FLINK-3074
> URL: https://issues.apache.org/jira/browse/FLINK-3074
> Project: Flink
> Issue Type: Improvement
> Components: YARN Client
> Reporter: Robert Metzger
> Assignee: Robert Metzger
> Fix For: 1.0.0
>
>
> Similar to the BlobServer, the YARN ApplicationMaster should allow starting
> it on a specified list or range of ports.
> In cases where only certain ports are allowed by a firewall, users can
> specify a range of ports where they want the AM to allocate its RPC port
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)