[
https://issues.apache.org/jira/browse/FLINK-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048440#comment-15048440
]
ASF GitHub Bot commented on FLINK-3074:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1416#discussion_r47073661
--- Diff:
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
@@ -120,18 +125,76 @@ abstract class ApplicationMasterBase {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
}
- val (actorSystem, jmActor, archiveActor, webMonitor) =
+ // we try to start the JobManager actor system using the port
definition
+ // from the config.
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ val amPortRange: String =
config.getString(ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+ ConfigConstants.DEFAULT_YARN_APPLICATION_MASTER_PORT)
+ val portsIterator = NetUtils.getPortRangeFromString(amPortRange)
+
+ // method to start the actor system.
+ def startActorSystem(portsIterator: java.util.Iterator[Integer]): //
return type -> next line
+ (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = {
+ val availableSocket = NetUtils.createSocketFromPorts(portsIterator,
+ new NetUtils.SocketFactory {
+ override def createSocket(port: Int): ServerSocket = new
ServerSocket(port)
+ })
+
+ // get port as integer and close socket
+ val tryPort = if (availableSocket == null) {
+ throw new BindException(s"Unable to allocate port for
ApplicationMaster in " +
+ s"specified port range: $amPortRange ")
+ } else {
+ val port = availableSocket.getLocalPort
+ availableSocket.close()
+ port // return for if
+ }
+
JobManager.startActorSystemAndJobManagerActors(
config,
JobManagerMode.CLUSTER,
streamingMode,
ownHostname,
- 0,
+ tryPort,
getJobManagerClass,
getArchivistClass
)
+ }
+
+ @tailrec
+ def retry[T](fn: => T, stopCond: => Boolean): Try[T] = {
--- End diff --
Very nice and scalaesque solution :+1:
> Make ApplicationMaster/JobManager akka port configurable
> --------------------------------------------------------
>
> Key: FLINK-3074
> URL: https://issues.apache.org/jira/browse/FLINK-3074
> Project: Flink
> Issue Type: Improvement
> Components: YARN Client
> Reporter: Robert Metzger
> Assignee: Robert Metzger
> Fix For: 1.0.0
>
>
> Similar to the BlobServer, the YARN ApplicationMaster should allow starting
> it on a specified list or range of ports.
> In cases where only certain ports are allowed by a firewall, users can
> specify a range of ports where they want the AM to allocate its RPC port
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)