tillrohrmann commented on a change in pull request #11284:
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395185040
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
##########
@@ -181,104 +169,115 @@ public static ActorSystem startActorSystem(
// if we come here, we have exhausted the port range
throw new BindException("Could not start actor system on any
port in port range "
- + portRangeDefinition);
+ + externalPortRange);
}
/**
- * Starts an Actor System at a specific port.
- *
+ * Starts a remote Actor System at given address and specific port.
* @param configuration The Flink configuration.
- * @param listeningAddress The address to listen at.
- * @param listeningPort The port to listen at.
+ * @param actorSystemName Name of the started {@link ActorSystem}
+ * @param externalAddress The external address to access the
ActorSystem.
+ * @param externalPort The external port to access the ActorSystem.
+ * @param bindAddress The local address to bind to.
+ * @param bindPort The local port to bind to.
* @param logger the logger to output log information.
+ * @param actorSystemExecutorConfiguration configuration for the
ActorSystem's underlying executor
+ * @param customConfig Custom Akka config to be combined with the
config derived from Flink configuration.
* @return The ActorSystem which has been started.
* @throws Exception
*/
- public static ActorSystem startActorSystem(
+ private static ActorSystem startRemoteActorSystem(
Configuration configuration,
- String listeningAddress,
- int listeningPort,
- Logger logger) throws Exception {
- return startActorSystem(
- configuration,
- listeningAddress,
- listeningPort,
- logger,
-
ForkJoinExecutorConfiguration.fromConfiguration(configuration));
- }
+ String actorSystemName,
+ String externalAddress,
+ int externalPort,
+ String bindAddress,
+ int bindPort,
+ Logger logger,
+ ActorSystemExecutorConfiguration
actorSystemExecutorConfiguration,
+ Config customConfig) throws Exception {
- /**
- * Starts an Actor System at a specific port.
- * @param configuration The Flink configuration.
- * @param listeningAddress The address to listen at.
- * @param listeningPort The port to listen at.
- * @param logger the logger to output log information.
- * @param actorSystemExecutorConfiguration configuration for the
ActorSystem's underlying executor
- * @return The ActorSystem which has been started.
- * @throws Exception
- */
- public static ActorSystem startActorSystem(
- Configuration configuration,
- String listeningAddress,
- int listeningPort,
- Logger logger,
- ActorSystemExecutorConfiguration
actorSystemExecutorConfiguration) throws Exception {
- return startActorSystem(
- configuration,
- AkkaUtils.getFlinkActorSystemName(),
- listeningAddress,
- listeningPort,
- logger,
- actorSystemExecutorConfiguration);
+ String externalHostPortUrl =
NetUtils.unresolvedHostAndPortToNormalizedString(externalAddress, externalPort);
+ String bindHostPortUrl =
NetUtils.unresolvedHostAndPortToNormalizedString(bindAddress, bindPort);
+ logger.info("Trying to start actor system, external address {},
bind address {}.", externalHostPortUrl, bindHostPortUrl);
+
+ try {
+ Config akkaConfig = AkkaUtils.getAkkaConfig(
+ configuration,
+ new Some<>(new Tuple2<>(externalAddress,
externalPort)),
+ new Some<>(new Tuple2<>(bindAddress, bindPort)),
+
actorSystemExecutorConfiguration.getAkkaConfig());
+
+ if (customConfig != null) {
+ akkaConfig =
customConfig.withFallback(akkaConfig);
+ }
+
+ return startActorSystem(akkaConfig, actorSystemName,
logger);
+ }
+ catch (Throwable t) {
+ if (t instanceof ChannelException) {
+ Throwable cause = t.getCause();
+ if (cause != null && t.getCause() instanceof
BindException) {
+ throw new IOException("Unable to create
ActorSystem at address " + externalHostPortUrl +
+ " : " + cause.getMessage(), t);
+ }
+ }
+ throw new Exception("Could not create actor system", t);
+ }
}
/**
- * Starts an Actor System at a specific port.
+ * Starts a local Actor System.
* @param configuration The Flink configuration.
- * @param actorSystemName Name of the started {@link ActorSystem}
- * @param listeningAddress The address to listen at.
- * @param listeningPort The port to listen at.
- * @param logger the logger to output log information.
- * @param actorSystemExecutorConfiguration configuration for the
ActorSystem's underlying executor
+ * @param actorSystemName Name of the started ActorSystem.
+ * @param logger The logger to output log information.
+ * @param actorSystemExecutorConfiguration Configuration for the
ActorSystem's underlying executor.
+ * @param customConfig Custom Akka config to be combined with the
config derived from Flink configuration.
* @return The ActorSystem which has been started.
* @throws Exception
*/
- public static ActorSystem startActorSystem(
+ public static ActorSystem startLocalActorSystem(
Configuration configuration,
String actorSystemName,
- String listeningAddress,
- int listeningPort,
Logger logger,
- ActorSystemExecutorConfiguration
actorSystemExecutorConfiguration) throws Exception {
+ ActorSystemExecutorConfiguration
actorSystemExecutorConfiguration,
+ Config customConfig) throws Exception {
- String hostPortUrl =
NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress,
listeningPort);
- logger.info("Trying to start actor system at {}", hostPortUrl);
+ logger.info("Trying to start local actor");
Review comment:
```suggestion
logger.info("Trying to start local actor system");
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services