tillrohrmann commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395185040
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ##########
 @@ -181,104 +169,115 @@ public static ActorSystem startActorSystem(
 
                // if we come here, we have exhausted the port range
                throw new BindException("Could not start actor system on any 
port in port range "
-                       + portRangeDefinition);
+                       + externalPortRange);
        }
 
        /**
-        * Starts an Actor System at a specific port.
-        *
+        * Starts a remote Actor System at given address and specific port.
         * @param configuration The Flink configuration.
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
+        * @param actorSystemName Name of the started {@link ActorSystem}
+        * @param externalAddress The external address to access the 
ActorSystem.
+        * @param externalPort The external port to access the ActorSystem.
+        * @param bindAddress The local address to bind to.
+        * @param bindPort The local port to bind to.
         * @param logger the logger to output log information.
+        * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
+        * @param customConfig Custom Akka config to be combined with the 
config derived from Flink configuration.
         * @return The ActorSystem which has been started.
         * @throws Exception
         */
-       public static ActorSystem startActorSystem(
+       private static ActorSystem startRemoteActorSystem(
                Configuration configuration,
-               String listeningAddress,
-               int listeningPort,
-               Logger logger) throws Exception {
-               return startActorSystem(
-                       configuration,
-                       listeningAddress,
-                       listeningPort,
-                       logger,
-                       
ForkJoinExecutorConfiguration.fromConfiguration(configuration));
-       }
+               String actorSystemName,
+               String externalAddress,
+               int externalPort,
+               String bindAddress,
+               int bindPort,
+               Logger logger,
+               ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration,
+               Config customConfig) throws Exception {
 
-       /**
-        * Starts an Actor System at a specific port.
-        * @param configuration The Flink configuration.
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
-        * @param logger the logger to output log information.
-        * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
-        * @return The ActorSystem which has been started.
-        * @throws Exception
-        */
-       public static ActorSystem startActorSystem(
-                               Configuration configuration,
-                               String listeningAddress,
-                               int listeningPort,
-                               Logger logger,
-                               ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration) throws Exception {
-               return startActorSystem(
-                       configuration,
-                       AkkaUtils.getFlinkActorSystemName(),
-                       listeningAddress,
-                       listeningPort,
-                       logger,
-                       actorSystemExecutorConfiguration);
+               String externalHostPortUrl = 
NetUtils.unresolvedHostAndPortToNormalizedString(externalAddress, externalPort);
+               String bindHostPortUrl = 
NetUtils.unresolvedHostAndPortToNormalizedString(bindAddress, bindPort);
+               logger.info("Trying to start actor system, external address {}, 
bind address {}.", externalHostPortUrl, bindHostPortUrl);
+
+               try {
+                       Config akkaConfig = AkkaUtils.getAkkaConfig(
+                               configuration,
+                               new Some<>(new Tuple2<>(externalAddress, 
externalPort)),
+                               new Some<>(new Tuple2<>(bindAddress, bindPort)),
+                               
actorSystemExecutorConfiguration.getAkkaConfig());
+
+                       if (customConfig != null) {
+                               akkaConfig = 
customConfig.withFallback(akkaConfig);
+                       }
+
+                       return startActorSystem(akkaConfig, actorSystemName, 
logger);
+               }
+               catch (Throwable t) {
+                       if (t instanceof ChannelException) {
+                               Throwable cause = t.getCause();
+                               if (cause != null && t.getCause() instanceof 
BindException) {
+                                       throw new IOException("Unable to create 
ActorSystem at address " + externalHostPortUrl +
+                                               " : " + cause.getMessage(), t);
+                               }
+                       }
+                       throw new Exception("Could not create actor system", t);
+               }
        }
 
        /**
-        * Starts an Actor System at a specific port.
+        * Starts a local Actor System.
         * @param configuration The Flink configuration.
-        * @param actorSystemName Name of the started {@link ActorSystem}
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
-        * @param logger the logger to output log information.
-        * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
+        * @param actorSystemName Name of the started ActorSystem.
+        * @param logger The logger to output log information.
+        * @param actorSystemExecutorConfiguration Configuration for the 
ActorSystem's underlying executor.
+        * @param customConfig Custom Akka config to be combined with the 
config derived from Flink configuration.
         * @return The ActorSystem which has been started.
         * @throws Exception
         */
-       public static ActorSystem startActorSystem(
+       public static ActorSystem startLocalActorSystem(
                Configuration configuration,
                String actorSystemName,
-               String listeningAddress,
-               int listeningPort,
                Logger logger,
-               ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration) throws Exception {
+               ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration,
+               Config customConfig) throws Exception {
 
-               String hostPortUrl = 
NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, 
listeningPort);
-               logger.info("Trying to start actor system at {}", hostPortUrl);
+               logger.info("Trying to start local actor");
 
 Review comment:
   ```suggestion
                logger.info("Trying to start local actor system");
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to