XComp commented on a change in pull request #16741: URL: https://github.com/apache/flink/pull/16741#discussion_r685251702
########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java ########## @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.function.FunctionUtils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ +class AkkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class); + + private static final String FLINK_ACTOR_SYSTEM_NAME = "flink"; + + public static String getFlinkActorSystemName() { + return FLINK_ACTOR_SYSTEM_NAME; + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private static Config getBasicAkkaConfig(Configuration configuration) { + final int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT); + final String jvmExitOnFatalError = + booleanToString(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" daemonic = off") + .add(" loggers = [\"akka.event.slf4j.Slf4jLogger\"]") + .add(" logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"") + .add(" log-config-on-start = off") + .add(" logger-startup-timeout = 30s") + .add(" loglevel = " + getLogLevel()) + .add(" stdout-loglevel = OFF") + .add(" log-dead-letters = " + logLifecycleEvents) + .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) + .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) + .add(" serialize-messages = off") + .add(" actor {") + .add(" guardian-supervisor-strategy = " + supervisorStrategy) + .add(" warn-about-java-serializer-usage = off") + .add(" allow-java-serialization = on") + .add(" default-dispatcher {") + .add(" throughput = " + akkaThroughput) + .add(" }") + .add(" supervisor-dispatcher {") + .add(" type = Dispatcher") + .add(" executor = \"thread-pool-executor\"") + .add(" thread-pool-executor {") + .add(" core-pool-size-min = 1") + .add(" core-pool-size-max = 1") + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + private static String getLogLevel() { + if (LOG.isTraceEnabled()) { + return "TRACE"; + } + if (LOG.isDebugEnabled()) { + return "DEBUG"; + } + if (LOG.isInfoEnabled()) { + return "INFO"; + } + if (LOG.isWarnEnabled()) { + return "WARN"; + } + if (LOG.isErrorEnabled()) { + return "ERROR"; + } + return "OFF"; + } + + public static Config getThreadPoolExecutorConfig( + RpcSystem.FixedThreadPoolExecutorConfiguration configuration) { + final int threadPriority = configuration.getThreadPriority(); + final int minNumThreads = configuration.getMinNumThreads(); + final int maxNumThreads = configuration.getMaxNumThreads(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher") + .add(" executor = thread-pool-executor") + .add(" thread-priority = " + threadPriority) + .add(" thread-pool-executor {") + .add(" core-pool-size-min = " + minNumThreads) + .add(" core-pool-size-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + public static Config getForkJoinExecutorConfig( + RpcSystem.ForkJoinExecutorConfiguration configuration) { + final double parallelismFactor = configuration.getParallelismFactor(); + final int minNumThreads = configuration.getMinParallelism(); + final int maxNumThreads = configuration.getMaxParallelism(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" executor = fork-join-executor") + .add(" fork-join-executor {") + .add(" parallelism-factor = " + parallelismFactor) + .add(" parallelism-min = " + minNumThreads) + .add(" parallelism-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + /** + * Creates a Akka config for a remote actor system listening on port on the network interface + * identified by bindAddress. + * + * @param configuration instance containing the user provided configuration values + * @param bindAddress of the network interface to bind on + * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages + * @return Flink's Akka configuration for remote actor systems + */ + private static Config getRemoteAkkaConfig( + Configuration configuration, + String bindAddress, + int port, + String externalHostname, + int externalPort) { + final AkkaConfigBuilder builder = new AkkaConfigBuilder(); + + addBaseRemoteAkkaConfig(builder, configuration, port, externalPort); + addHostnameRemoteAkkaConfig(builder, bindAddress, externalHostname); + addSslRemoteAkkaConfig(builder, configuration); + + return builder.build(); + } + + private static void addBaseRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, + Configuration configuration, + int port, + int externalPort) { + final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + + final String startupTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration( + configuration.getString( + AkkaOptions.STARTUP_TIMEOUT, + TimeUtils.getStringInMillis( + akkaAskTimeout.multipliedBy(10L))))); + + final String akkaTCPTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))); + + final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE); + + final int clientSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); + final int clientSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); + final double clientSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); + final int serverSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); + final int serverSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); + final double serverSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); + + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + + final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR); + + akkaConfigBuilder + .add("akka {") + .add(" actor {") + .add(" provider = \"akka.remote.RemoteActorRefProvider\"") + .add(" }") + .add(" remote.artery.enabled = false") + .add(" remote.startup-timeout = " + startupTimeout) + .add(" remote.classic {") + .add(" # disable the transport failure detector by setting very high values") + .add(" transport-failure-detector{") + .add(" acceptable-heartbeat-pause = 6000 s") + .add(" heartbeat-interval = 1000 s") + .add(" threshold = 300") Review comment: ```suggestion .add(" threshold = 300") ``` ########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java ########## @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.function.FunctionUtils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ +class AkkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class); + + private static final String FLINK_ACTOR_SYSTEM_NAME = "flink"; + + public static String getFlinkActorSystemName() { + return FLINK_ACTOR_SYSTEM_NAME; + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private static Config getBasicAkkaConfig(Configuration configuration) { + final int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT); + final String jvmExitOnFatalError = + booleanToString(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" daemonic = off") + .add(" loggers = [\"akka.event.slf4j.Slf4jLogger\"]") + .add(" logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"") + .add(" log-config-on-start = off") + .add(" logger-startup-timeout = 30s") + .add(" loglevel = " + getLogLevel()) + .add(" stdout-loglevel = OFF") + .add(" log-dead-letters = " + logLifecycleEvents) + .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) + .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) + .add(" serialize-messages = off") + .add(" actor {") + .add(" guardian-supervisor-strategy = " + supervisorStrategy) + .add(" warn-about-java-serializer-usage = off") + .add(" allow-java-serialization = on") Review comment: This line is not represented in the scala code... ########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java ########## @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.function.FunctionUtils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ +class AkkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class); + + private static final String FLINK_ACTOR_SYSTEM_NAME = "flink"; + + public static String getFlinkActorSystemName() { + return FLINK_ACTOR_SYSTEM_NAME; + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private static Config getBasicAkkaConfig(Configuration configuration) { + final int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT); + final String jvmExitOnFatalError = + booleanToString(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" daemonic = off") + .add(" loggers = [\"akka.event.slf4j.Slf4jLogger\"]") + .add(" logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"") + .add(" log-config-on-start = off") + .add(" logger-startup-timeout = 30s") + .add(" loglevel = " + getLogLevel()) + .add(" stdout-loglevel = OFF") + .add(" log-dead-letters = " + logLifecycleEvents) + .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) + .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) + .add(" serialize-messages = off") + .add(" actor {") + .add(" guardian-supervisor-strategy = " + supervisorStrategy) + .add(" warn-about-java-serializer-usage = off") + .add(" allow-java-serialization = on") + .add(" default-dispatcher {") + .add(" throughput = " + akkaThroughput) + .add(" }") + .add(" supervisor-dispatcher {") + .add(" type = Dispatcher") + .add(" executor = \"thread-pool-executor\"") + .add(" thread-pool-executor {") + .add(" core-pool-size-min = 1") + .add(" core-pool-size-max = 1") + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + private static String getLogLevel() { + if (LOG.isTraceEnabled()) { + return "TRACE"; + } + if (LOG.isDebugEnabled()) { + return "DEBUG"; + } + if (LOG.isInfoEnabled()) { + return "INFO"; + } + if (LOG.isWarnEnabled()) { + return "WARN"; + } + if (LOG.isErrorEnabled()) { + return "ERROR"; + } + return "OFF"; + } + + public static Config getThreadPoolExecutorConfig( + RpcSystem.FixedThreadPoolExecutorConfiguration configuration) { + final int threadPriority = configuration.getThreadPriority(); + final int minNumThreads = configuration.getMinNumThreads(); + final int maxNumThreads = configuration.getMaxNumThreads(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher") + .add(" executor = thread-pool-executor") + .add(" thread-priority = " + threadPriority) + .add(" thread-pool-executor {") + .add(" core-pool-size-min = " + minNumThreads) + .add(" core-pool-size-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + public static Config getForkJoinExecutorConfig( + RpcSystem.ForkJoinExecutorConfiguration configuration) { + final double parallelismFactor = configuration.getParallelismFactor(); + final int minNumThreads = configuration.getMinParallelism(); + final int maxNumThreads = configuration.getMaxParallelism(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" executor = fork-join-executor") + .add(" fork-join-executor {") + .add(" parallelism-factor = " + parallelismFactor) + .add(" parallelism-min = " + minNumThreads) + .add(" parallelism-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + /** + * Creates a Akka config for a remote actor system listening on port on the network interface + * identified by bindAddress. + * + * @param configuration instance containing the user provided configuration values + * @param bindAddress of the network interface to bind on + * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages + * @return Flink's Akka configuration for remote actor systems + */ + private static Config getRemoteAkkaConfig( + Configuration configuration, + String bindAddress, + int port, + String externalHostname, + int externalPort) { + final AkkaConfigBuilder builder = new AkkaConfigBuilder(); + + addBaseRemoteAkkaConfig(builder, configuration, port, externalPort); + addHostnameRemoteAkkaConfig(builder, bindAddress, externalHostname); + addSslRemoteAkkaConfig(builder, configuration); + + return builder.build(); + } + + private static void addBaseRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, + Configuration configuration, + int port, + int externalPort) { + final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + + final String startupTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration( + configuration.getString( + AkkaOptions.STARTUP_TIMEOUT, + TimeUtils.getStringInMillis( + akkaAskTimeout.multipliedBy(10L))))); + + final String akkaTCPTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))); + + final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE); + + final int clientSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); + final int clientSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); + final double clientSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); + final int serverSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); + final int serverSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); + final double serverSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); + + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + + final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR); + + akkaConfigBuilder + .add("akka {") + .add(" actor {") + .add(" provider = \"akka.remote.RemoteActorRefProvider\"") + .add(" }") + .add(" remote.artery.enabled = false") Review comment: That parameter is not used in the original code... ########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java ########## @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.function.FunctionUtils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ +class AkkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class); + + private static final String FLINK_ACTOR_SYSTEM_NAME = "flink"; + + public static String getFlinkActorSystemName() { + return FLINK_ACTOR_SYSTEM_NAME; + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private static Config getBasicAkkaConfig(Configuration configuration) { + final int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT); + final String jvmExitOnFatalError = + booleanToString(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" daemonic = off") + .add(" loggers = [\"akka.event.slf4j.Slf4jLogger\"]") + .add(" logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"") + .add(" log-config-on-start = off") + .add(" logger-startup-timeout = 30s") + .add(" loglevel = " + getLogLevel()) + .add(" stdout-loglevel = OFF") + .add(" log-dead-letters = " + logLifecycleEvents) + .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) + .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) + .add(" serialize-messages = off") + .add(" actor {") + .add(" guardian-supervisor-strategy = " + supervisorStrategy) + .add(" warn-about-java-serializer-usage = off") + .add(" allow-java-serialization = on") + .add(" default-dispatcher {") + .add(" throughput = " + akkaThroughput) + .add(" }") + .add(" supervisor-dispatcher {") + .add(" type = Dispatcher") + .add(" executor = \"thread-pool-executor\"") + .add(" thread-pool-executor {") + .add(" core-pool-size-min = 1") + .add(" core-pool-size-max = 1") + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + private static String getLogLevel() { + if (LOG.isTraceEnabled()) { + return "TRACE"; + } + if (LOG.isDebugEnabled()) { + return "DEBUG"; + } + if (LOG.isInfoEnabled()) { + return "INFO"; + } + if (LOG.isWarnEnabled()) { + return "WARN"; + } + if (LOG.isErrorEnabled()) { + return "ERROR"; + } + return "OFF"; + } + + public static Config getThreadPoolExecutorConfig( + RpcSystem.FixedThreadPoolExecutorConfiguration configuration) { + final int threadPriority = configuration.getThreadPriority(); + final int minNumThreads = configuration.getMinNumThreads(); + final int maxNumThreads = configuration.getMaxNumThreads(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher") + .add(" executor = thread-pool-executor") + .add(" thread-priority = " + threadPriority) + .add(" thread-pool-executor {") + .add(" core-pool-size-min = " + minNumThreads) + .add(" core-pool-size-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + public static Config getForkJoinExecutorConfig( + RpcSystem.ForkJoinExecutorConfiguration configuration) { + final double parallelismFactor = configuration.getParallelismFactor(); + final int minNumThreads = configuration.getMinParallelism(); + final int maxNumThreads = configuration.getMaxParallelism(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" executor = fork-join-executor") + .add(" fork-join-executor {") + .add(" parallelism-factor = " + parallelismFactor) + .add(" parallelism-min = " + minNumThreads) + .add(" parallelism-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + /** + * Creates a Akka config for a remote actor system listening on port on the network interface + * identified by bindAddress. + * + * @param configuration instance containing the user provided configuration values + * @param bindAddress of the network interface to bind on + * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages + * @return Flink's Akka configuration for remote actor systems + */ + private static Config getRemoteAkkaConfig( + Configuration configuration, + String bindAddress, + int port, + String externalHostname, + int externalPort) { + final AkkaConfigBuilder builder = new AkkaConfigBuilder(); + + addBaseRemoteAkkaConfig(builder, configuration, port, externalPort); + addHostnameRemoteAkkaConfig(builder, bindAddress, externalHostname); + addSslRemoteAkkaConfig(builder, configuration); + + return builder.build(); + } + + private static void addBaseRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, + Configuration configuration, + int port, + int externalPort) { + final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + + final String startupTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration( + configuration.getString( + AkkaOptions.STARTUP_TIMEOUT, + TimeUtils.getStringInMillis( + akkaAskTimeout.multipliedBy(10L))))); + + final String akkaTCPTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))); + + final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE); + + final int clientSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); + final int clientSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); + final double clientSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); + final int serverSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); + final int serverSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); + final double serverSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); + + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + + final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR); + + akkaConfigBuilder + .add("akka {") + .add(" actor {") + .add(" provider = \"akka.remote.RemoteActorRefProvider\"") + .add(" }") + .add(" remote.artery.enabled = false") + .add(" remote.startup-timeout = " + startupTimeout) + .add(" remote.classic {") Review comment: does `.classic` do anything? It's not used in the original code. ########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java ########## @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.function.FunctionUtils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ +class AkkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class); + + private static final String FLINK_ACTOR_SYSTEM_NAME = "flink"; + + public static String getFlinkActorSystemName() { + return FLINK_ACTOR_SYSTEM_NAME; + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private static Config getBasicAkkaConfig(Configuration configuration) { + final int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT); + final String jvmExitOnFatalError = + booleanToString(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" daemonic = off") + .add(" loggers = [\"akka.event.slf4j.Slf4jLogger\"]") + .add(" logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"") + .add(" log-config-on-start = off") + .add(" logger-startup-timeout = 30s") + .add(" loglevel = " + getLogLevel()) + .add(" stdout-loglevel = OFF") + .add(" log-dead-letters = " + logLifecycleEvents) + .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) + .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) + .add(" serialize-messages = off") + .add(" actor {") + .add(" guardian-supervisor-strategy = " + supervisorStrategy) + .add(" warn-about-java-serializer-usage = off") + .add(" allow-java-serialization = on") + .add(" default-dispatcher {") + .add(" throughput = " + akkaThroughput) + .add(" }") + .add(" supervisor-dispatcher {") + .add(" type = Dispatcher") + .add(" executor = \"thread-pool-executor\"") + .add(" thread-pool-executor {") + .add(" core-pool-size-min = 1") + .add(" core-pool-size-max = 1") + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + private static String getLogLevel() { + if (LOG.isTraceEnabled()) { + return "TRACE"; + } + if (LOG.isDebugEnabled()) { + return "DEBUG"; + } + if (LOG.isInfoEnabled()) { + return "INFO"; + } + if (LOG.isWarnEnabled()) { + return "WARN"; + } + if (LOG.isErrorEnabled()) { + return "ERROR"; + } + return "OFF"; + } + + public static Config getThreadPoolExecutorConfig( + RpcSystem.FixedThreadPoolExecutorConfiguration configuration) { + final int threadPriority = configuration.getThreadPriority(); + final int minNumThreads = configuration.getMinNumThreads(); + final int maxNumThreads = configuration.getMaxNumThreads(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher") + .add(" executor = thread-pool-executor") + .add(" thread-priority = " + threadPriority) + .add(" thread-pool-executor {") + .add(" core-pool-size-min = " + minNumThreads) + .add(" core-pool-size-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + public static Config getForkJoinExecutorConfig( + RpcSystem.ForkJoinExecutorConfiguration configuration) { + final double parallelismFactor = configuration.getParallelismFactor(); + final int minNumThreads = configuration.getMinParallelism(); + final int maxNumThreads = configuration.getMaxParallelism(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" executor = fork-join-executor") + .add(" fork-join-executor {") + .add(" parallelism-factor = " + parallelismFactor) + .add(" parallelism-min = " + minNumThreads) + .add(" parallelism-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + /** + * Creates a Akka config for a remote actor system listening on port on the network interface + * identified by bindAddress. + * + * @param configuration instance containing the user provided configuration values + * @param bindAddress of the network interface to bind on + * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages + * @return Flink's Akka configuration for remote actor systems + */ + private static Config getRemoteAkkaConfig( + Configuration configuration, + String bindAddress, + int port, + String externalHostname, + int externalPort) { + final AkkaConfigBuilder builder = new AkkaConfigBuilder(); + + addBaseRemoteAkkaConfig(builder, configuration, port, externalPort); + addHostnameRemoteAkkaConfig(builder, bindAddress, externalHostname); + addSslRemoteAkkaConfig(builder, configuration); + + return builder.build(); + } + + private static void addBaseRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, + Configuration configuration, + int port, + int externalPort) { + final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + + final String startupTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration( + configuration.getString( + AkkaOptions.STARTUP_TIMEOUT, + TimeUtils.getStringInMillis( + akkaAskTimeout.multipliedBy(10L))))); + + final String akkaTCPTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))); + + final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE); + + final int clientSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); + final int clientSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); + final double clientSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); + final int serverSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); + final int serverSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); + final double serverSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); + + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + + final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR); + + akkaConfigBuilder + .add("akka {") + .add(" actor {") + .add(" provider = \"akka.remote.RemoteActorRefProvider\"") + .add(" }") + .add(" remote.artery.enabled = false") + .add(" remote.startup-timeout = " + startupTimeout) + .add(" remote.classic {") + .add(" # disable the transport failure detector by setting very high values") + .add(" transport-failure-detector{") + .add(" acceptable-heartbeat-pause = 6000 s") + .add(" heartbeat-interval = 1000 s") + .add(" threshold = 300") + .add(" }") + .add(" enabled-transports = [\"akka.remote.classic.netty.tcp\"]") + .add(" netty {") + .add(" tcp {") + .add(" transport-class = \"akka.remote.transport.netty.NettyTransport\"") + .add(" port = " + externalPort) + .add(" bind-port = " + port) + .add(" connection-timeout = " + akkaTCPTimeout) + .add(" maximum-frame-size = " + akkaFramesize) + .add(" tcp-nodelay = on") + .add(" client-socket-worker-pool {") + .add(" pool-size-min = " + clientSocketWorkerPoolPoolSizeMin) + .add(" pool-size-max = " + clientSocketWorkerPoolPoolSizeMax) + .add(" pool-size-factor = " + clientSocketWorkerPoolPoolSizeFactor) + .add(" }") + .add(" server-socket-worker-pool {") + .add(" pool-size-min = " + serverSocketWorkerPoolPoolSizeMin) + .add(" pool-size-max = " + serverSocketWorkerPoolPoolSizeMax) + .add(" pool-size-factor = " + serverSocketWorkerPoolPoolSizeFactor) + .add(" }") + .add(" }") + .add(" }") + .add(" log-remote-lifecycle-events = " + logLifecycleEvents) + .add(" retry-gate-closed-for = " + retryGateClosedFor + " ms") + .add(" }") + .add("}"); + } + + private static void addHostnameRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, String bindAddress, String externalHostname) { + final String normalizedExternalHostname = + NetUtils.unresolvedHostToNormalizedString(externalHostname); + final String effectiveHostname = + normalizedExternalHostname != null && !normalizedExternalHostname.isEmpty() + ? normalizedExternalHostname + // if bindAddress is null or empty, then leave bindAddress unspecified. Akka + // will pick InetAddress.getLocalHost.getHostAddress + : ""; + + akkaConfigBuilder + .add("akka {") + .add(" remote.classic {") Review comment: I guess that needs to match the `remote.classic` defintion above (also not present in original code). ########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java ########## @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.function.FunctionUtils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ +class AkkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class); + + private static final String FLINK_ACTOR_SYSTEM_NAME = "flink"; + + public static String getFlinkActorSystemName() { + return FLINK_ACTOR_SYSTEM_NAME; + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private static Config getBasicAkkaConfig(Configuration configuration) { + final int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT); + final String jvmExitOnFatalError = + booleanToString(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" daemonic = off") + .add(" loggers = [\"akka.event.slf4j.Slf4jLogger\"]") + .add(" logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"") + .add(" log-config-on-start = off") + .add(" logger-startup-timeout = 30s") + .add(" loglevel = " + getLogLevel()) + .add(" stdout-loglevel = OFF") + .add(" log-dead-letters = " + logLifecycleEvents) + .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) + .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) + .add(" serialize-messages = off") + .add(" actor {") + .add(" guardian-supervisor-strategy = " + supervisorStrategy) + .add(" warn-about-java-serializer-usage = off") + .add(" allow-java-serialization = on") + .add(" default-dispatcher {") + .add(" throughput = " + akkaThroughput) + .add(" }") + .add(" supervisor-dispatcher {") + .add(" type = Dispatcher") + .add(" executor = \"thread-pool-executor\"") + .add(" thread-pool-executor {") + .add(" core-pool-size-min = 1") + .add(" core-pool-size-max = 1") + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + private static String getLogLevel() { + if (LOG.isTraceEnabled()) { + return "TRACE"; + } + if (LOG.isDebugEnabled()) { + return "DEBUG"; + } + if (LOG.isInfoEnabled()) { + return "INFO"; + } + if (LOG.isWarnEnabled()) { + return "WARN"; + } + if (LOG.isErrorEnabled()) { + return "ERROR"; + } + return "OFF"; + } + + public static Config getThreadPoolExecutorConfig( + RpcSystem.FixedThreadPoolExecutorConfiguration configuration) { + final int threadPriority = configuration.getThreadPriority(); + final int minNumThreads = configuration.getMinNumThreads(); + final int maxNumThreads = configuration.getMaxNumThreads(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher") + .add(" executor = thread-pool-executor") + .add(" thread-priority = " + threadPriority) + .add(" thread-pool-executor {") + .add(" core-pool-size-min = " + minNumThreads) + .add(" core-pool-size-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + public static Config getForkJoinExecutorConfig( + RpcSystem.ForkJoinExecutorConfiguration configuration) { + final double parallelismFactor = configuration.getParallelismFactor(); + final int minNumThreads = configuration.getMinParallelism(); + final int maxNumThreads = configuration.getMaxParallelism(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" executor = fork-join-executor") + .add(" fork-join-executor {") + .add(" parallelism-factor = " + parallelismFactor) + .add(" parallelism-min = " + minNumThreads) + .add(" parallelism-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + /** + * Creates a Akka config for a remote actor system listening on port on the network interface + * identified by bindAddress. + * + * @param configuration instance containing the user provided configuration values + * @param bindAddress of the network interface to bind on + * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages + * @return Flink's Akka configuration for remote actor systems + */ + private static Config getRemoteAkkaConfig( + Configuration configuration, + String bindAddress, + int port, + String externalHostname, + int externalPort) { + final AkkaConfigBuilder builder = new AkkaConfigBuilder(); + + addBaseRemoteAkkaConfig(builder, configuration, port, externalPort); + addHostnameRemoteAkkaConfig(builder, bindAddress, externalHostname); + addSslRemoteAkkaConfig(builder, configuration); + + return builder.build(); + } + + private static void addBaseRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, + Configuration configuration, + int port, + int externalPort) { + final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + + final String startupTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration( + configuration.getString( + AkkaOptions.STARTUP_TIMEOUT, + TimeUtils.getStringInMillis( + akkaAskTimeout.multipliedBy(10L))))); + + final String akkaTCPTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))); + + final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE); + + final int clientSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); + final int clientSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); + final double clientSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); + final int serverSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); + final int serverSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); + final double serverSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); + + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + + final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR); + + akkaConfigBuilder + .add("akka {") + .add(" actor {") + .add(" provider = \"akka.remote.RemoteActorRefProvider\"") + .add(" }") + .add(" remote.artery.enabled = false") + .add(" remote.startup-timeout = " + startupTimeout) + .add(" remote.classic {") + .add(" # disable the transport failure detector by setting very high values") + .add(" transport-failure-detector{") + .add(" acceptable-heartbeat-pause = 6000 s") + .add(" heartbeat-interval = 1000 s") + .add(" threshold = 300") + .add(" }") + .add(" enabled-transports = [\"akka.remote.classic.netty.tcp\"]") + .add(" netty {") + .add(" tcp {") + .add(" transport-class = \"akka.remote.transport.netty.NettyTransport\"") + .add(" port = " + externalPort) + .add(" bind-port = " + port) + .add(" connection-timeout = " + akkaTCPTimeout) + .add(" maximum-frame-size = " + akkaFramesize) + .add(" tcp-nodelay = on") + .add(" client-socket-worker-pool {") + .add(" pool-size-min = " + clientSocketWorkerPoolPoolSizeMin) + .add(" pool-size-max = " + clientSocketWorkerPoolPoolSizeMax) + .add(" pool-size-factor = " + clientSocketWorkerPoolPoolSizeFactor) + .add(" }") + .add(" server-socket-worker-pool {") + .add(" pool-size-min = " + serverSocketWorkerPoolPoolSizeMin) + .add(" pool-size-max = " + serverSocketWorkerPoolPoolSizeMax) + .add(" pool-size-factor = " + serverSocketWorkerPoolPoolSizeFactor) + .add(" }") + .add(" }") + .add(" }") + .add(" log-remote-lifecycle-events = " + logLifecycleEvents) + .add(" retry-gate-closed-for = " + retryGateClosedFor + " ms") + .add(" }") + .add("}"); + } + + private static void addHostnameRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, String bindAddress, String externalHostname) { + final String normalizedExternalHostname = + NetUtils.unresolvedHostToNormalizedString(externalHostname); + final String effectiveHostname = + normalizedExternalHostname != null && !normalizedExternalHostname.isEmpty() + ? normalizedExternalHostname + // if bindAddress is null or empty, then leave bindAddress unspecified. Akka + // will pick InetAddress.getLocalHost.getHostAddress + : ""; + + akkaConfigBuilder + .add("akka {") + .add(" remote.classic {") + .add(" netty {") + .add(" tcp {") + .add(" hostname = \"" + effectiveHostname + "\"") + .add(" bind-hostname = \"" + bindAddress + "\"") + .add(" }") + .add(" }") + .add(" }") + .add("}"); + } + + private static void addSslRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, Configuration configuration) { + + final boolean akkaEnableSSLConfig = + configuration.getBoolean(AkkaOptions.SSL_ENABLED) + && SecurityOptions.isInternalSSLEnabled(configuration); + + final String akkaEnableSSL = booleanToString(akkaEnableSSLConfig); + + final String akkaSSLKeyStore = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEYSTORE, + configuration.getString(SecurityOptions.SSL_KEYSTORE)); + + final String akkaSSLKeyStorePassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, + configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD)); + + final String akkaSSLKeyPassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, + configuration.getString(SecurityOptions.SSL_KEY_PASSWORD)); + + final String akkaSSLTrustStore = + configuration.getString( + SecurityOptions.SSL_INTERNAL_TRUSTSTORE, + configuration.getString(SecurityOptions.SSL_TRUSTSTORE)); + + final String akkaSSLTrustStorePassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, + configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD)); + + final String akkaSSLCertFingerprintString = + configuration.getString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT); + + final String akkaSSLCertFingerprints = + akkaSSLCertFingerprintString != null + ? Arrays.stream(akkaSSLCertFingerprintString.split(",")) + .collect(Collectors.joining("\",\"", "[\"", "\"]")) + : "[]"; + + final String akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL); + + final String akkaSSLAlgorithmsString = + configuration.getString(SecurityOptions.SSL_ALGORITHMS); + final String akkaSSLAlgorithms = + Arrays.stream(akkaSSLAlgorithmsString.split(",")) + .collect(Collectors.joining(",", "[", "]")); + + final String sslEngineProviderName = CustomSSLEngineProvider.class.getCanonicalName(); + + akkaConfigBuilder + .add("akka {") + .add(" remote.classic {") + .add(" enabled-transports = [\"akka.remote.classic.netty.ssl\"]") + .add(" netty {") + .add(" ssl = ${akka.remote.classic.netty.tcp}") + .add(" ssl {") + .add(" enable-ssl = " + akkaEnableSSL) + .add(" ssl-engine-provider = " + sslEngineProviderName) + .add(" security {") + .add(" key-store = \"" + akkaSSLKeyStore + "\"") + .add(" key-store-password = \"" + akkaSSLKeyStorePassword + "\"") + .add(" key-password = \"" + akkaSSLKeyPassword + "\"") + .add(" trust-store = \"" + akkaSSLTrustStore + "\"") + .add(" trust-store-password = \"" + akkaSSLTrustStorePassword + "\"") + .add(" protocol = " + akkaSSLProtocol + "") + .add(" enabled-algorithms = " + akkaSSLAlgorithms + "") + .add(" random-number-generator = \"\"") + .add(" require-mutual-authentication = on") + .add(" cert-fingerprints = " + akkaSSLCertFingerprints + "") + .add(" }") + .add(" }") + .add(" }") + .add(" }") + .add("}"); + } + + /** + * Creates a local actor system without remoting. + * + * @param configuration instance containing the user provided configuration values + * @return The created actor system + */ + public static ActorSystem createLocalActorSystem(Configuration configuration) { + final Config akkaConfig = getAkkaConfig(configuration, null); + return createActorSystem(akkaConfig); + } + + /** + * Creates an actor system with the given akka config. + * + * @param akkaConfig configuration for the actor system + * @return created actor system + */ + private static ActorSystem createActorSystem(Config akkaConfig) { + return createActorSystem(getFlinkActorSystemName(), akkaConfig); + } + + /** + * Creates an actor system with the given akka config. + * + * @param akkaConfig configuration for the actor system + * @return created actor system + */ + public static ActorSystem createActorSystem(String actorSystemName, Config akkaConfig) { + // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650) + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); + return RobustActorSystem.create(actorSystemName, akkaConfig); + } + + /** + * Creates an actor system with the default config and listening on a random port of the + * localhost. + * + * @return default actor system listening on a random port of the localhost + */ + @VisibleForTesting + public static ActorSystem createDefaultActorSystem() { + return createActorSystem(getDefaultAkkaConfig()); + } + + /** + * Creates the default akka configuration which listens on a random port on the local machine. + * All configuration values are set to default values. + * + * @return Flink's Akka default config + */ + private static Config getDefaultAkkaConfig() { + return getAkkaConfig(new Configuration(), new HostAndPort("", 0)); + } + + /** + * Creates an akka config with the provided configuration values. If the listening address is + * specified, then the actor system will listen on the respective address. + * + * @param configuration instance containing the user provided configuration values + * @param externalAddress optional tuple of bindAddress and port to be reachable at. If null is + * given, then an Akka config for local actor system will be returned + * @return Akka config + */ + public static Config getAkkaConfig( + Configuration configuration, @Nullable HostAndPort externalAddress) { + return getAkkaConfig( + configuration, + externalAddress, + null, + AkkaUtils.getForkJoinExecutorConfig( + AkkaBootstrapTools.getForkJoinExecutorConfiguration(configuration))); + } + + /** + * Creates an akka config with the provided configuration values. If the listening address is + * specified, then the actor system will listen on the respective address. + * + * @param configuration instance containing the user provided configuration values + * @param externalAddress optional tuple of external address and port to be reachable at. If + * null is given, then an Akka config for local actor system will be returned + * @param bindAddress optional tuple of bind address and port to be used locally. If null is + * given, wildcard IP address and the external port wil be used. Takes effect only if + * externalAddress is not null. + * @param executorConfig config defining the used executor by the default dispatcher + * @return Akka config + */ + public static Config getAkkaConfig( + Configuration configuration, + @Nullable HostAndPort externalAddress, + @Nullable HostAndPort bindAddress, + Config executorConfig) { + + final Config defaultConfig = + AkkaUtils.getBasicAkkaConfig(configuration).withFallback(executorConfig); + + if (externalAddress != null) { + if (bindAddress != null) { + final Config remoteConfig = + AkkaUtils.getRemoteAkkaConfig( + configuration, + bindAddress.getHost(), + bindAddress.getPort(), + externalAddress.getHost(), + externalAddress.getPort()); + + return remoteConfig.withFallback(defaultConfig); + } else { + final Config remoteConfig = + AkkaUtils.getRemoteAkkaConfig( + configuration, + NetUtils.getWildcardIPAddress(), + externalAddress.getPort(), + externalAddress.getHost(), + externalAddress.getPort()); + + return remoteConfig.withFallback(defaultConfig); + } + } + + return defaultConfig; + } + + /** + * Returns the address of the given {@link ActorSystem}. The {@link Address} object contains the + * port and the host under which the actor system is reachable. + * + * @param system {@link ActorSystem} for which the {@link Address} shall be retrieved + * @return {@link Address} of the given {@link ActorSystem} + */ + public static Address getAddress(ActorSystem system) { + return new RemoteAddressExtension().apply(system).getAddress(); + } + + /** + * Returns the given {@link ActorRef}'s path string representation with host and port of the + * {@link ActorSystem} in which the actor is running. + * + * @param system {@link ActorSystem} in which the given {@link ActorRef} is running + * @param actor {@link ActorRef} of the actor for which the URL has to be generated + * @return String containing the {@link ActorSystem} independent URL of the actor + */ + public static String getAkkaURL(ActorSystem system, ActorRef actor) { + final Address address = getAddress(system); + return actor.path().toStringWithAddress(address); + } + + /** + * Extracts the {@link Address} from the given akka URL. + * + * @param akkaURL to extract the {@link Address} from + * @throws MalformedURLException if the {@link Address} could not be parsed from the given akka + * URL + * @return Extracted {@link Address} from the given akka URL + */ + @SuppressWarnings("RedundantThrows") // hidden checked exception coming from Akka + public static Address getAddressFromAkkaURL(String akkaURL) throws MalformedURLException { + return AddressFromURIString.apply(akkaURL); + } + + /** + * Extracts the hostname and the port of the remote actor system from the given Akka URL. The + * result is an {@link InetSocketAddress} instance containing the extracted hostname and port. + * If the Akka URL does not contain the hostname and port information, e.g. a local Akka URL is + * provided, then an {@link Exception} is thrown. + * + * @param akkaURL The URL to extract the host and port from. + * @throws java.lang.Exception Thrown, if the given string does not represent a proper url + * @return The InetSocketAddress with the extracted host and port. + */ + public static InetSocketAddress getInetSocketAddressFromAkkaURL(String akkaURL) + throws Exception { + // AkkaURLs have the form schema://systemName@host:port/.... if it's a remote Akka URL + try { + final Address address = getAddressFromAkkaURL(akkaURL); + + if (address.host().isDefined() && address.port().isDefined()) { + return new InetSocketAddress(address.host().get(), (int) address.port().get()); + } else { + throw new MalformedURLException(); + } + } catch (MalformedURLException e) { + throw new Exception("Could not retrieve InetSocketAddress from Akka URL " + akkaURL); + } + } + + /** + * Terminates the given {@link ActorSystem} and returns its termination future. + * + * @param actorSystem to terminate + * @return Termination future + */ + public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem) { + return AkkaFutureUtils.toJava(actorSystem.terminate()).thenAccept(FunctionUtils.ignoreFn()); + } + + private static String booleanToString(boolean flag) { Review comment: ```suggestion private static String booleanToOnOrOff(boolean flag) { ``` The method name could be misleading. With the initial method name, I would have expected to get `"true"` or `"false"` as a return value. ########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java ########## @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.function.FunctionUtils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ +class AkkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class); + + private static final String FLINK_ACTOR_SYSTEM_NAME = "flink"; + + public static String getFlinkActorSystemName() { + return FLINK_ACTOR_SYSTEM_NAME; + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private static Config getBasicAkkaConfig(Configuration configuration) { + final int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT); + final String jvmExitOnFatalError = + booleanToString(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" daemonic = off") + .add(" loggers = [\"akka.event.slf4j.Slf4jLogger\"]") + .add(" logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"") + .add(" log-config-on-start = off") + .add(" logger-startup-timeout = 30s") + .add(" loglevel = " + getLogLevel()) + .add(" stdout-loglevel = OFF") + .add(" log-dead-letters = " + logLifecycleEvents) + .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) + .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) + .add(" serialize-messages = off") + .add(" actor {") + .add(" guardian-supervisor-strategy = " + supervisorStrategy) + .add(" warn-about-java-serializer-usage = off") + .add(" allow-java-serialization = on") + .add(" default-dispatcher {") + .add(" throughput = " + akkaThroughput) + .add(" }") + .add(" supervisor-dispatcher {") + .add(" type = Dispatcher") + .add(" executor = \"thread-pool-executor\"") + .add(" thread-pool-executor {") + .add(" core-pool-size-min = 1") + .add(" core-pool-size-max = 1") + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + private static String getLogLevel() { + if (LOG.isTraceEnabled()) { + return "TRACE"; + } + if (LOG.isDebugEnabled()) { + return "DEBUG"; + } + if (LOG.isInfoEnabled()) { + return "INFO"; + } + if (LOG.isWarnEnabled()) { + return "WARN"; + } + if (LOG.isErrorEnabled()) { + return "ERROR"; + } + return "OFF"; + } + + public static Config getThreadPoolExecutorConfig( + RpcSystem.FixedThreadPoolExecutorConfiguration configuration) { + final int threadPriority = configuration.getThreadPriority(); + final int minNumThreads = configuration.getMinNumThreads(); + final int maxNumThreads = configuration.getMaxNumThreads(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher") + .add(" executor = thread-pool-executor") + .add(" thread-priority = " + threadPriority) + .add(" thread-pool-executor {") + .add(" core-pool-size-min = " + minNumThreads) + .add(" core-pool-size-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + public static Config getForkJoinExecutorConfig( + RpcSystem.ForkJoinExecutorConfiguration configuration) { + final double parallelismFactor = configuration.getParallelismFactor(); + final int minNumThreads = configuration.getMinParallelism(); + final int maxNumThreads = configuration.getMaxParallelism(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" executor = fork-join-executor") + .add(" fork-join-executor {") + .add(" parallelism-factor = " + parallelismFactor) + .add(" parallelism-min = " + minNumThreads) + .add(" parallelism-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + /** + * Creates a Akka config for a remote actor system listening on port on the network interface + * identified by bindAddress. + * + * @param configuration instance containing the user provided configuration values + * @param bindAddress of the network interface to bind on + * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages + * @return Flink's Akka configuration for remote actor systems + */ + private static Config getRemoteAkkaConfig( + Configuration configuration, + String bindAddress, + int port, + String externalHostname, + int externalPort) { + final AkkaConfigBuilder builder = new AkkaConfigBuilder(); + + addBaseRemoteAkkaConfig(builder, configuration, port, externalPort); + addHostnameRemoteAkkaConfig(builder, bindAddress, externalHostname); + addSslRemoteAkkaConfig(builder, configuration); + + return builder.build(); + } + + private static void addBaseRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, + Configuration configuration, + int port, + int externalPort) { + final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + + final String startupTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration( + configuration.getString( + AkkaOptions.STARTUP_TIMEOUT, + TimeUtils.getStringInMillis( + akkaAskTimeout.multipliedBy(10L))))); + + final String akkaTCPTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))); + + final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE); + + final int clientSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); + final int clientSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); + final double clientSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); + final int serverSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); + final int serverSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); + final double serverSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); + + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + + final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR); + + akkaConfigBuilder + .add("akka {") + .add(" actor {") + .add(" provider = \"akka.remote.RemoteActorRefProvider\"") + .add(" }") + .add(" remote.artery.enabled = false") + .add(" remote.startup-timeout = " + startupTimeout) + .add(" remote.classic {") + .add(" # disable the transport failure detector by setting very high values") + .add(" transport-failure-detector{") + .add(" acceptable-heartbeat-pause = 6000 s") + .add(" heartbeat-interval = 1000 s") + .add(" threshold = 300") + .add(" }") + .add(" enabled-transports = [\"akka.remote.classic.netty.tcp\"]") + .add(" netty {") + .add(" tcp {") + .add(" transport-class = \"akka.remote.transport.netty.NettyTransport\"") + .add(" port = " + externalPort) + .add(" bind-port = " + port) + .add(" connection-timeout = " + akkaTCPTimeout) + .add(" maximum-frame-size = " + akkaFramesize) + .add(" tcp-nodelay = on") + .add(" client-socket-worker-pool {") + .add(" pool-size-min = " + clientSocketWorkerPoolPoolSizeMin) + .add(" pool-size-max = " + clientSocketWorkerPoolPoolSizeMax) + .add(" pool-size-factor = " + clientSocketWorkerPoolPoolSizeFactor) + .add(" }") + .add(" server-socket-worker-pool {") + .add(" pool-size-min = " + serverSocketWorkerPoolPoolSizeMin) + .add(" pool-size-max = " + serverSocketWorkerPoolPoolSizeMax) + .add(" pool-size-factor = " + serverSocketWorkerPoolPoolSizeFactor) + .add(" }") + .add(" }") + .add(" }") + .add(" log-remote-lifecycle-events = " + logLifecycleEvents) + .add(" retry-gate-closed-for = " + retryGateClosedFor + " ms") + .add(" }") + .add("}"); + } + + private static void addHostnameRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, String bindAddress, String externalHostname) { + final String normalizedExternalHostname = + NetUtils.unresolvedHostToNormalizedString(externalHostname); + final String effectiveHostname = + normalizedExternalHostname != null && !normalizedExternalHostname.isEmpty() + ? normalizedExternalHostname + // if bindAddress is null or empty, then leave bindAddress unspecified. Akka + // will pick InetAddress.getLocalHost.getHostAddress + : ""; + + akkaConfigBuilder + .add("akka {") + .add(" remote.classic {") + .add(" netty {") + .add(" tcp {") + .add(" hostname = \"" + effectiveHostname + "\"") + .add(" bind-hostname = \"" + bindAddress + "\"") + .add(" }") + .add(" }") + .add(" }") + .add("}"); + } + + private static void addSslRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, Configuration configuration) { + + final boolean akkaEnableSSLConfig = + configuration.getBoolean(AkkaOptions.SSL_ENABLED) + && SecurityOptions.isInternalSSLEnabled(configuration); + + final String akkaEnableSSL = booleanToString(akkaEnableSSLConfig); + + final String akkaSSLKeyStore = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEYSTORE, + configuration.getString(SecurityOptions.SSL_KEYSTORE)); + + final String akkaSSLKeyStorePassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, + configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD)); + + final String akkaSSLKeyPassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, + configuration.getString(SecurityOptions.SSL_KEY_PASSWORD)); + + final String akkaSSLTrustStore = + configuration.getString( + SecurityOptions.SSL_INTERNAL_TRUSTSTORE, + configuration.getString(SecurityOptions.SSL_TRUSTSTORE)); + + final String akkaSSLTrustStorePassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, + configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD)); + + final String akkaSSLCertFingerprintString = + configuration.getString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT); + + final String akkaSSLCertFingerprints = + akkaSSLCertFingerprintString != null + ? Arrays.stream(akkaSSLCertFingerprintString.split(",")) + .collect(Collectors.joining("\",\"", "[\"", "\"]")) + : "[]"; + + final String akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL); + + final String akkaSSLAlgorithmsString = + configuration.getString(SecurityOptions.SSL_ALGORITHMS); + final String akkaSSLAlgorithms = + Arrays.stream(akkaSSLAlgorithmsString.split(",")) + .collect(Collectors.joining(",", "[", "]")); + + final String sslEngineProviderName = CustomSSLEngineProvider.class.getCanonicalName(); + + akkaConfigBuilder + .add("akka {") + .add(" remote.classic {") + .add(" enabled-transports = [\"akka.remote.classic.netty.ssl\"]") + .add(" netty {") + .add(" ssl = ${akka.remote.classic.netty.tcp}") + .add(" ssl {") + .add(" enable-ssl = " + akkaEnableSSL) + .add(" ssl-engine-provider = " + sslEngineProviderName) + .add(" security {") + .add(" key-store = \"" + akkaSSLKeyStore + "\"") + .add(" key-store-password = \"" + akkaSSLKeyStorePassword + "\"") + .add(" key-password = \"" + akkaSSLKeyPassword + "\"") + .add(" trust-store = \"" + akkaSSLTrustStore + "\"") + .add(" trust-store-password = \"" + akkaSSLTrustStorePassword + "\"") + .add(" protocol = " + akkaSSLProtocol + "") + .add(" enabled-algorithms = " + akkaSSLAlgorithms + "") + .add(" random-number-generator = \"\"") + .add(" require-mutual-authentication = on") + .add(" cert-fingerprints = " + akkaSSLCertFingerprints + "") + .add(" }") + .add(" }") + .add(" }") + .add(" }") + .add("}"); + } + + /** + * Creates a local actor system without remoting. + * + * @param configuration instance containing the user provided configuration values + * @return The created actor system + */ + public static ActorSystem createLocalActorSystem(Configuration configuration) { + final Config akkaConfig = getAkkaConfig(configuration, null); + return createActorSystem(akkaConfig); + } + + /** + * Creates an actor system with the given akka config. + * + * @param akkaConfig configuration for the actor system + * @return created actor system + */ + private static ActorSystem createActorSystem(Config akkaConfig) { + return createActorSystem(getFlinkActorSystemName(), akkaConfig); + } + + /** + * Creates an actor system with the given akka config. + * + * @param akkaConfig configuration for the actor system + * @return created actor system + */ Review comment: JavaDoc copy&paste? At least, `@param actorSystemName` is missing in the latter one. ########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java ########## @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.function.FunctionUtils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ +class AkkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class); + + private static final String FLINK_ACTOR_SYSTEM_NAME = "flink"; + + public static String getFlinkActorSystemName() { + return FLINK_ACTOR_SYSTEM_NAME; + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private static Config getBasicAkkaConfig(Configuration configuration) { + final int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT); + final String jvmExitOnFatalError = + booleanToString(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" daemonic = off") + .add(" loggers = [\"akka.event.slf4j.Slf4jLogger\"]") + .add(" logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"") + .add(" log-config-on-start = off") + .add(" logger-startup-timeout = 30s") + .add(" loglevel = " + getLogLevel()) + .add(" stdout-loglevel = OFF") + .add(" log-dead-letters = " + logLifecycleEvents) + .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) + .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) + .add(" serialize-messages = off") + .add(" actor {") + .add(" guardian-supervisor-strategy = " + supervisorStrategy) + .add(" warn-about-java-serializer-usage = off") + .add(" allow-java-serialization = on") + .add(" default-dispatcher {") + .add(" throughput = " + akkaThroughput) + .add(" }") + .add(" supervisor-dispatcher {") + .add(" type = Dispatcher") + .add(" executor = \"thread-pool-executor\"") + .add(" thread-pool-executor {") + .add(" core-pool-size-min = 1") + .add(" core-pool-size-max = 1") + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + private static String getLogLevel() { + if (LOG.isTraceEnabled()) { + return "TRACE"; + } + if (LOG.isDebugEnabled()) { + return "DEBUG"; + } + if (LOG.isInfoEnabled()) { + return "INFO"; + } + if (LOG.isWarnEnabled()) { + return "WARN"; + } + if (LOG.isErrorEnabled()) { + return "ERROR"; + } + return "OFF"; + } + + public static Config getThreadPoolExecutorConfig( + RpcSystem.FixedThreadPoolExecutorConfiguration configuration) { + final int threadPriority = configuration.getThreadPriority(); + final int minNumThreads = configuration.getMinNumThreads(); + final int maxNumThreads = configuration.getMaxNumThreads(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher") + .add(" executor = thread-pool-executor") + .add(" thread-priority = " + threadPriority) + .add(" thread-pool-executor {") + .add(" core-pool-size-min = " + minNumThreads) + .add(" core-pool-size-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + public static Config getForkJoinExecutorConfig( + RpcSystem.ForkJoinExecutorConfiguration configuration) { + final double parallelismFactor = configuration.getParallelismFactor(); + final int minNumThreads = configuration.getMinParallelism(); + final int maxNumThreads = configuration.getMaxParallelism(); + + return new AkkaConfigBuilder() + .add("akka {") + .add(" actor {") + .add(" default-dispatcher {") + .add(" executor = fork-join-executor") + .add(" fork-join-executor {") + .add(" parallelism-factor = " + parallelismFactor) + .add(" parallelism-min = " + minNumThreads) + .add(" parallelism-max = " + maxNumThreads) + .add(" }") + .add(" }") + .add(" }") + .add("}") + .build(); + } + + /** + * Creates a Akka config for a remote actor system listening on port on the network interface + * identified by bindAddress. + * + * @param configuration instance containing the user provided configuration values + * @param bindAddress of the network interface to bind on + * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages + * @return Flink's Akka configuration for remote actor systems + */ + private static Config getRemoteAkkaConfig( + Configuration configuration, + String bindAddress, + int port, + String externalHostname, + int externalPort) { + final AkkaConfigBuilder builder = new AkkaConfigBuilder(); + + addBaseRemoteAkkaConfig(builder, configuration, port, externalPort); + addHostnameRemoteAkkaConfig(builder, bindAddress, externalHostname); + addSslRemoteAkkaConfig(builder, configuration); + + return builder.build(); + } + + private static void addBaseRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, + Configuration configuration, + int port, + int externalPort) { + final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + + final String startupTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration( + configuration.getString( + AkkaOptions.STARTUP_TIMEOUT, + TimeUtils.getStringInMillis( + akkaAskTimeout.multipliedBy(10L))))); + + final String akkaTCPTimeout = + TimeUtils.getStringInMillis( + TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))); + + final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE); + + final int clientSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); + final int clientSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); + final double clientSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); + final int serverSocketWorkerPoolPoolSizeMin = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); + final int serverSocketWorkerPoolPoolSizeMax = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); + final double serverSocketWorkerPoolPoolSizeFactor = + configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); + + final String logLifecycleEvents = + booleanToString(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + + final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR); + + akkaConfigBuilder + .add("akka {") + .add(" actor {") + .add(" provider = \"akka.remote.RemoteActorRefProvider\"") + .add(" }") + .add(" remote.artery.enabled = false") + .add(" remote.startup-timeout = " + startupTimeout) + .add(" remote.classic {") + .add(" # disable the transport failure detector by setting very high values") + .add(" transport-failure-detector{") + .add(" acceptable-heartbeat-pause = 6000 s") + .add(" heartbeat-interval = 1000 s") + .add(" threshold = 300") + .add(" }") + .add(" enabled-transports = [\"akka.remote.classic.netty.tcp\"]") + .add(" netty {") + .add(" tcp {") + .add(" transport-class = \"akka.remote.transport.netty.NettyTransport\"") + .add(" port = " + externalPort) + .add(" bind-port = " + port) + .add(" connection-timeout = " + akkaTCPTimeout) + .add(" maximum-frame-size = " + akkaFramesize) + .add(" tcp-nodelay = on") + .add(" client-socket-worker-pool {") + .add(" pool-size-min = " + clientSocketWorkerPoolPoolSizeMin) + .add(" pool-size-max = " + clientSocketWorkerPoolPoolSizeMax) + .add(" pool-size-factor = " + clientSocketWorkerPoolPoolSizeFactor) + .add(" }") + .add(" server-socket-worker-pool {") + .add(" pool-size-min = " + serverSocketWorkerPoolPoolSizeMin) + .add(" pool-size-max = " + serverSocketWorkerPoolPoolSizeMax) + .add(" pool-size-factor = " + serverSocketWorkerPoolPoolSizeFactor) + .add(" }") + .add(" }") + .add(" }") + .add(" log-remote-lifecycle-events = " + logLifecycleEvents) + .add(" retry-gate-closed-for = " + retryGateClosedFor + " ms") + .add(" }") + .add("}"); + } + + private static void addHostnameRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, String bindAddress, String externalHostname) { + final String normalizedExternalHostname = + NetUtils.unresolvedHostToNormalizedString(externalHostname); + final String effectiveHostname = + normalizedExternalHostname != null && !normalizedExternalHostname.isEmpty() + ? normalizedExternalHostname + // if bindAddress is null or empty, then leave bindAddress unspecified. Akka + // will pick InetAddress.getLocalHost.getHostAddress + : ""; + + akkaConfigBuilder + .add("akka {") + .add(" remote.classic {") + .add(" netty {") + .add(" tcp {") + .add(" hostname = \"" + effectiveHostname + "\"") + .add(" bind-hostname = \"" + bindAddress + "\"") + .add(" }") + .add(" }") + .add(" }") + .add("}"); + } + + private static void addSslRemoteAkkaConfig( + AkkaConfigBuilder akkaConfigBuilder, Configuration configuration) { + + final boolean akkaEnableSSLConfig = + configuration.getBoolean(AkkaOptions.SSL_ENABLED) + && SecurityOptions.isInternalSSLEnabled(configuration); + + final String akkaEnableSSL = booleanToString(akkaEnableSSLConfig); + + final String akkaSSLKeyStore = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEYSTORE, + configuration.getString(SecurityOptions.SSL_KEYSTORE)); + + final String akkaSSLKeyStorePassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, + configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD)); + + final String akkaSSLKeyPassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, + configuration.getString(SecurityOptions.SSL_KEY_PASSWORD)); + + final String akkaSSLTrustStore = + configuration.getString( + SecurityOptions.SSL_INTERNAL_TRUSTSTORE, + configuration.getString(SecurityOptions.SSL_TRUSTSTORE)); + + final String akkaSSLTrustStorePassword = + configuration.getString( + SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, + configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD)); + + final String akkaSSLCertFingerprintString = + configuration.getString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT); + + final String akkaSSLCertFingerprints = + akkaSSLCertFingerprintString != null + ? Arrays.stream(akkaSSLCertFingerprintString.split(",")) + .collect(Collectors.joining("\",\"", "[\"", "\"]")) + : "[]"; + + final String akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL); + + final String akkaSSLAlgorithmsString = + configuration.getString(SecurityOptions.SSL_ALGORITHMS); + final String akkaSSLAlgorithms = + Arrays.stream(akkaSSLAlgorithmsString.split(",")) + .collect(Collectors.joining(",", "[", "]")); + + final String sslEngineProviderName = CustomSSLEngineProvider.class.getCanonicalName(); + + akkaConfigBuilder + .add("akka {") + .add(" remote.classic {") Review comment: Just for the sake of completeness: `.classic` is not used in the original code. ########## File path: flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TestLogger; + +import com.typesafe.config.Config; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.time.Duration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.collection.IsEmptyCollection.empty; + +/** Tests for the {@link AkkaUtils}. */ +public class AkkaUtilsTest extends TestLogger { + + @Test + public void getHostFromAkkaURLForRemoteAkkaURL() throws Exception { + final String host = "127.0.0.1"; + final int port = 1234; + + final InetSocketAddress address = new InetSocketAddress(host, port); + + final String remoteAkkaUrl = + AkkaRpcServiceUtils.getRpcUrl( + host, + port, + "actor", + AddressResolution.NO_ADDRESS_RESOLUTION, + AkkaRpcServiceUtils.AkkaProtocol.TCP); + + final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(remoteAkkaUrl); + + assertThat(result, equalTo(address)); + } + + @Test(expected = Exception.class) + public void getHostFromAkkaURLThrowsExceptionIfAddressCannotBeRetrieved() throws Exception { + final String localAkkaURL = "akka://flink/user/actor"; + + AkkaUtils.getInetSocketAddressFromAkkaURL(localAkkaURL); + } + + @Test + public void getHostFromAkkaURLReturnsHostAfterAtSign() throws Exception { + final String url = "akka.tcp://flink@localhost:1234/user/jobmanager"; + final InetSocketAddress expected = new InetSocketAddress("localhost", 1234); + + final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(url); + + assertThat(result, equalTo(expected)); + } + + @Test + public void getHostFromAkkaURLHandlesAkkaTcpProtocol() throws Exception { + final String url = "akka.tcp://flink@localhost:1234/user/jobmanager"; + final InetSocketAddress expected = new InetSocketAddress("localhost", 1234); + + final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(url); + + assertThat(result, equalTo(expected)); + } + + @Test + public void getHostFromAkkaURLHandlesAkkaSslTcpProtocol() throws Exception { + final String url = "akka.ssl.tcp://flink@localhost:1234/user/jobmanager"; + final InetSocketAddress expected = new InetSocketAddress("localhost", 1234); + + final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(url); + + assertThat(result, equalTo(expected)); + } + + @Test + public void getHostFromAkkaURLHandlesIPv4Addresses() throws Exception { + final String ipv4Address = "192.168.0.1"; + final int port = 1234; + final InetSocketAddress address = new InetSocketAddress(ipv4Address, port); + + final String url = "akka://flink@" + ipv4Address + ":" + port + "/user/jobmanager"; + final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(url); + + assertThat(result, equalTo(address)); + } + + @Test + public void getHostFromAkkaURLHandlesIPv6Addresses() throws Exception { + final String ipv6Address = "2001:db8:10:11:12:ff00:42:8329"; + final int port = 1234; + final InetSocketAddress address = new InetSocketAddress(ipv6Address, port); + + final String url = "akka://flink@[" + ipv6Address + "]:" + port + "/user/jobmanager"; + final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(url); + + assertThat(result, equalTo(address)); + } + + @Test + public void getHostFromAkkaURLHandlesIPv6AddressesTcp() throws Exception { + final String ipv6Address = "2001:db8:10:11:12:ff00:42:8329"; + final int port = 1234; + final InetSocketAddress address = new InetSocketAddress(ipv6Address, port); + + final String url = "akka.tcp://flink@[" + ipv6Address + "]:" + port + "/user/jobmanager"; + final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(url); + + assertThat(result, equalTo(address)); + } + + @Test + public void getHostFromAkkaURLHandlesIPv6AddressesSsl() throws Exception { + final String ipv6Address = "2001:db8:10:11:12:ff00:42:8329"; + final int port = 1234; + final InetSocketAddress address = new InetSocketAddress(ipv6Address, port); + + final String url = + "akka.ssl.tcp://flink@[" + ipv6Address + "]:" + port + "/user/jobmanager"; + final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(url); + + assertThat(result, equalTo(address)); + } + + @Test + public void getAkkaConfigNormalizesHostName() { + final Configuration configuration = new Configuration(); + final String hostname = "AbC123foOBaR"; + final int port = 1234; + + final Config akkaConfig = + AkkaUtils.getAkkaConfig(configuration, new HostAndPort(hostname, port)); + + assertThat( + akkaConfig.getString("akka.remote.classic.netty.tcp.hostname"), + equalTo(NetUtils.unresolvedHostToNormalizedString(hostname))); + } + + @Test + public void getAkkaConfigDefaultsToLocalHost() throws UnknownHostException { + final Config akkaConfig = + AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("", 0)); + + final String hostname = akkaConfig.getString("akka.remote.classic.netty.tcp.hostname"); + + assertThat(InetAddress.getByName(hostname).isLoopbackAddress(), is(true)); + } + + @Test + public void getAkkaConfigDefaultsToForkJoinExecutor() { + final Config akkaConfig = AkkaUtils.getAkkaConfig(new Configuration(), null); + + assertThat( + akkaConfig.getString("akka.actor.default-dispatcher.executor"), + is("fork-join-executor")); + } + + @Test + public void getAkkaConfigSetsExecutorWithThreadPriority() { + final int threadPriority = 3; + final int minThreads = 1; + final int maxThreads = 3; + + final Config akkaConfig = + AkkaUtils.getAkkaConfig( + new Configuration(), + new HostAndPort("localhost", 1234), + null, + AkkaUtils.getThreadPoolExecutorConfig( + new RpcSystem.FixedThreadPoolExecutorConfiguration( + minThreads, maxThreads, threadPriority))); + + assertThat( + akkaConfig.getString("akka.actor.default-dispatcher.executor"), + is("thread-pool-executor")); + assertThat( + akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority"), + is(threadPriority)); + assertThat( + akkaConfig.getInt( + "akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-min"), + is(minThreads)); + assertThat( + akkaConfig.getInt( + "akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-max"), + is(maxThreads)); + } + + @Test + public void getAkkaConfigHandlesIPv6Address() { + final String ipv6AddressString = "2001:db8:10:11:12:ff00:42:8329"; + final Config akkaConfig = + AkkaUtils.getAkkaConfig( + new Configuration(), new HostAndPort(ipv6AddressString, 1234)); + + assertThat( + akkaConfig.getString("akka.remote.classic.netty.tcp.hostname"), + is(NetUtils.unresolvedHostToNormalizedString(ipv6AddressString))); + } + + @Test + public void getAkkaConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() { + final Configuration configuration = new Configuration(); + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100)); + + final Config akkaConfig = + AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)); + + assertThat(akkaConfig.getString("akka.remote.startup-timeout"), is("1000ms")); Review comment: why don't we have to add the `.classic` (i.e. `akka.remote.classic.startup-timeout`) here? Looks like the test is correct as is. Just wondered about it because of the inconsistency... 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
