[ 
https://issues.apache.org/jira/browse/FLINK-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580786#comment-16580786
 ] 

ASF GitHub Bot commented on FLINK-9859:
---------------------------------------

asfgit closed pull request #6339: [FLINK-9859] [runtime] More Akka config
URL: https://github.com/apache/flink/pull/6339
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/akka_configuration.html 
b/docs/_includes/generated/akka_configuration.html
index 352c656488e..e97a127e745 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -12,11 +12,41 @@
             <td style="word-wrap: break-word;">"10 s"</td>
             <td>Timeout used for all futures and blocking Akka calls. If Flink 
fails due to timeouts then you should try to increase this value. Timeouts can 
be caused by slow machines or a congested network. The timeout value requires a 
time-unit specifier (ms/s/min/h/d).</td>
         </tr>
+        <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-factor</h5></td>
+            <td style="word-wrap: break-word;">1.0</td>
+            <td>The pool size factor is used to determine thread pool size 
using the following formula: ceil(available processors * factor). Resulting 
size is then bounded by the pool-size-min and pool-size-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-max</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Max number of threads to cap factor-based number to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-min</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Min number of threads to cap factor-based number to.</td>
+        </tr>
         <tr>
             <td><h5>akka.client.timeout</h5></td>
             <td style="word-wrap: break-word;">"60 s"</td>
             <td>Timeout for all blocking calls on the client side.</td>
         </tr>
+        <tr>
+            <td><h5>akka.fork-join-executor.parallelism-factor</h5></td>
+            <td style="word-wrap: break-word;">2.0</td>
+            <td>The parallelism factor is used to determine thread pool size 
using the following formula: ceil(available processors * factor). Resulting 
size is then bounded by the parallelism-min and parallelism-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.fork-join-executor.parallelism-max</h5></td>
+            <td style="word-wrap: break-word;">64</td>
+            <td>Max number of threads to cap factor-based parallelism number 
to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.fork-join-executor.parallelism-min</h5></td>
+            <td style="word-wrap: break-word;">8</td>
+            <td>Min number of threads to cap factor-based parallelism number 
to.</td>
+        </tr>
         <tr>
             <td><h5>akka.framesize</h5></td>
             <td style="word-wrap: break-word;">"10485760b"</td>
@@ -42,6 +72,21 @@
             <td style="word-wrap: break-word;">50</td>
             <td>Milliseconds a gate should be closed for after a remote 
connection was disconnected.</td>
         </tr>
+        <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-factor</h5></td>
+            <td style="word-wrap: break-word;">1.0</td>
+            <td>The pool size factor is used to determine thread pool size 
using the following formula: ceil(available processors * factor). Resulting 
size is then bounded by the pool-size-min and pool-size-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-max</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Max number of threads to cap factor-based number to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-min</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Min number of threads to cap factor-based number to.</td>
+        </tr>
         <tr>
             <td><h5>akka.ssl.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 43c7876fbd8..af8d75ac35f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -194,4 +194,81 @@
                .key("akka.retry-gate-closed-for")
                .defaultValue(50L)
                .withDescription("Milliseconds a gate should be closed for 
after a remote connection was disconnected.");
+
+       // ==================================================
+       // Configurations for fork-join-executor.
+       // ==================================================
+
+       public static final ConfigOption<Double> 
FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = ConfigOptions
+               .key("akka.fork-join-executor.parallelism-factor")
+               .defaultValue(2.0)
+               .withDescription(Description.builder()
+                       .text("The parallelism factor is used to determine 
thread pool size using the" +
+                               " following formula: ceil(available processors 
* factor). Resulting size" +
+                               " is then bounded by the parallelism-min and 
parallelism-max values."
+                       ).build());
+
+       public static final ConfigOption<Integer> 
FORK_JOIN_EXECUTOR_PARALLELISM_MIN = ConfigOptions
+               .key("akka.fork-join-executor.parallelism-min")
+               .defaultValue(8)
+               .withDescription(Description.builder()
+                       .text("Min number of threads to cap factor-based 
parallelism number to.").build());
+
+       public static final ConfigOption<Integer> 
FORK_JOIN_EXECUTOR_PARALLELISM_MAX = ConfigOptions
+               .key("akka.fork-join-executor.parallelism-max")
+               .defaultValue(64)
+               .withDescription(Description.builder()
+                       .text("Max number of threads to cap factor-based 
parallelism number to.").build());
+
+       // ==================================================
+       // Configurations for client-socket-work-pool.
+       // ==================================================
+
+       public static final ConfigOption<Integer> 
CLIENT_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
+               .key("akka.client-socket-worker-pool.pool-size-min")
+               .defaultValue(2)
+               .withDescription(Description.builder()
+                       .text("Min number of threads to cap factor-based number 
to.").build());
+
+       public static final ConfigOption<Integer> 
CLIENT_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
+               .key("akka.client-socket-worker-pool.pool-size-max")
+               .defaultValue(2)
+               .withDescription(Description.builder()
+                       .text("Max number of threads to cap factor-based number 
to.").build());
+
+       public static final ConfigOption<Double> 
CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
+               .key("akka.client-socket-worker-pool.pool-size-factor")
+               .defaultValue(1.0)
+               .withDescription(Description.builder()
+                       .text("The pool size factor is used to determine thread 
pool size" +
+                               " using the following formula: ceil(available 
processors * factor)." +
+                               " Resulting size is then bounded by the 
pool-size-min and" +
+                               " pool-size-max values."
+                       ).build());
+
+       // ==================================================
+       // Configurations for server-socket-work-pool.
+       // ==================================================
+
+       public static final ConfigOption<Integer> 
SERVER_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
+               .key("akka.server-socket-worker-pool.pool-size-min")
+               .defaultValue(1)
+               .withDescription(Description.builder()
+                       .text("Min number of threads to cap factor-based number 
to.").build());
+
+       public static final ConfigOption<Integer> 
SERVER_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
+               .key("akka.server-socket-worker-pool.pool-size-max")
+               .defaultValue(2)
+               .withDescription(Description.builder()
+                       .text("Max number of threads to cap factor-based number 
to.").build());
+
+       public static final ConfigOption<Double> 
SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
+               .key("akka.server-socket-worker-pool.pool-size-factor")
+               .defaultValue(1.0)
+               .withDescription(Description.builder()
+                       .text("The pool size factor is used to determine thread 
pool size" +
+                               " using the following formula: ceil(available 
processors * factor)." +
+                               " Resulting size is then bounded by the 
pool-size-min and" +
+                               " pool-size-max values."
+                       ).build());
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 4e43480f1ec..56e45762263 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -104,14 +104,7 @@ public static ActorSystem startActorSystem(
                while (portsIterator.hasNext()) {
                        // first, we check if the port is available by opening 
a socket
                        // if the actor system fails to start on the port, we 
try further
-                       ServerSocket availableSocket = 
NetUtils.createSocketFromPorts(
-                               portsIterator,
-                               new NetUtils.SocketFactory() {
-                                       @Override
-                                       public ServerSocket createSocket(int 
port) throws IOException {
-                                               return new ServerSocket(port);
-                                       }
-                               });
+                       ServerSocket availableSocket = 
NetUtils.createSocketFromPorts(portsIterator, ServerSocket::new);
 
                        int port;
                        if (availableSocket == null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 982a53668c5..3a626986361 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -52,7 +52,7 @@
        private static final Logger LOG = 
LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
 
        private static final String AKKA_TCP = "akka.tcp";
-       private static final String AkKA_SSL_TCP = "akka.ssl.tcp";
+       private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
 
        private static final AtomicLong nextNameOffset = new AtomicLong(0L);
 
@@ -162,7 +162,7 @@ public static String getRpcUrl(
                checkNotNull(endpointName, "endpointName is null");
                checkArgument(port > 0 && port <= 65535, "port must be in [1, 
65535]");
 
-               final String protocolPrefix = akkaProtocol == 
AkkaProtocol.SSL_TCP ? AkKA_SSL_TCP : AKKA_TCP;
+               final String protocolPrefix = akkaProtocol == 
AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
 
                if (addressResolution == 
AddressResolution.TRY_ADDRESS_RESOLUTION) {
                        // Fail fast if the hostname cannot be resolved
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b58bfe13ff4..050b4de9e1e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
 import org.jboss.netty.channel.ChannelException
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
 
 import scala.annotation.tailrec
 import scala.concurrent._
@@ -44,9 +44,9 @@ import scala.language.postfixOps
  * actor systems resides in this class.
  */
 object AkkaUtils {
-  val LOG = LoggerFactory.getLogger(AkkaUtils.getClass)
+  val LOG: Logger = LoggerFactory.getLogger(AkkaUtils.getClass)
 
-  val INF_TIMEOUT = 21474835 seconds
+  val INF_TIMEOUT: FiniteDuration = 21474835 seconds
 
   /**
    * Creates a local actor system without remoting.
@@ -124,7 +124,9 @@ object AkkaUtils {
     * @param port to bind against
     * @return A remote Akka config
     */
-  def getAkkaConfig(configuration: Configuration, hostname: String, port: 
Int): Config = {
+  def getAkkaConfig(configuration: Configuration,
+                    hostname: String,
+                    port: Int): Config = {
     getAkkaConfig(configuration, Some((hostname, port)))
   }
 
@@ -203,6 +205,24 @@ object AkkaUtils {
     val supervisorStrategy = 
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
       .getCanonicalName
 
+    val forkJoinExecutorParallelismFactor =
+      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+    val forkJoinExecutorParallelismMin =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+    val forkJoinExecutorParallelismMax =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+    val forkJoinExecutorConfig =
+      s"""
+         | fork-join-executor {
+         |   parallelism-factor = $forkJoinExecutorParallelismFactor
+         |   parallelism-min = $forkJoinExecutorParallelismMin
+         |   parallelism-max = $forkJoinExecutorParallelismMax
+         | }
+       """.stripMargin
+
     val config =
       s"""
         |akka {
@@ -230,9 +250,7 @@ object AkkaUtils {
         |   default-dispatcher {
         |     throughput = $akkaThroughput
         |
-        |     fork-join-executor {
-        |       parallelism-factor = 2.0
-        |     }
+        |   $forkJoinExecutorConfig
         |   }
         | }
         |}
@@ -263,7 +281,7 @@ object AkkaUtils {
   private def validateHeartbeat(pauseParamName: String,
                                 pauseValue: String,
                                 intervalParamName: String,
-                                intervalValue: String) = {
+                                intervalValue: String): Unit = {
     if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) {
       throw new IllegalConfigurationException(
         "%s [%s] must greater then %s [%s]",
@@ -367,6 +385,25 @@ object AkkaUtils {
     val akkaSSLAlgorithmsString = 
configuration.getString(SecurityOptions.SSL_ALGORITHMS)
     val akkaSSLAlgorithms = 
akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]")
 
+    val clientSocketWorkerPoolPoolSizeMin =
+      configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN)
+
+    val clientSocketWorkerPoolPoolSizeMax =
+      configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX)
+
+    val clientSocketWorkerPoolPoolSizeFactor =
+      
configuration.getDouble(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR)
+
+    val serverSocketWorkerPoolPoolSizeMin =
+      configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN)
+
+    val serverSocketWorkerPoolPoolSizeMax =
+      configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX)
+
+    val serverSocketWorkerPoolPoolSizeFactor =
+      
configuration.getDouble(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR)
+
+
     val configString =
       s"""
          |akka {
@@ -397,6 +434,18 @@ object AkkaUtils {
          |        connection-timeout = $akkaTCPTimeout
          |        maximum-frame-size = $akkaFramesize
          |        tcp-nodelay = on
+         |
+         |        client-socket-worker-pool {
+         |          pool-size-min = $clientSocketWorkerPoolPoolSizeMin
+         |          pool-size-max = $clientSocketWorkerPoolPoolSizeMax
+         |          pool-size-factor = $clientSocketWorkerPoolPoolSizeFactor
+         |        }
+         |
+         |        server-socket-worker-pool {
+         |          pool-size-min = $serverSocketWorkerPoolPoolSizeMin
+         |          pool-size-max = $serverSocketWorkerPoolPoolSizeMax
+         |          pool-size-factor = $serverSocketWorkerPoolPoolSizeFactor
+         |        }
          |      }
          |    }
          |
@@ -790,7 +839,7 @@ object AkkaUtils {
           retryOnBindException(fn, stopCond)
         }
       case scala.util.Failure(x: Exception) => x.getCause match {
-        case c: ChannelException =>
+        case _: ChannelException =>
           if (stopCond) {
             scala.util.Failure(new RuntimeException(
               "Unable to do further retries starting the actor system"))
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0855991eccf..a5c4c17f1aa 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2139,7 +2139,6 @@ object JobManager {
       configuration: Configuration,
       externalHostname: String,
       port: Int): ActorSystem = {
-
     // Bring up the job manager actor system first, bind it to the given 
address.
     val jobManagerSystem = BootstrapTools.startActorSystem(
       configuration,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> More Akka config
> ----------------
>
>                 Key: FLINK-9859
>                 URL: https://issues.apache.org/jira/browse/FLINK-9859
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>    Affects Versions: 1.5.1
>            Reporter: 陈梓立
>            Assignee: 陈梓立
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.3
>
>
> Add more akka config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to