asfgit closed pull request #6539: [FLINK-10123] Use ExecutorThreadFactory 
instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/akka_configuration.html 
b/docs/_includes/generated/akka_configuration.html
index 352c656488e..f5a2a5d77f9 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -12,11 +12,41 @@
             <td style="word-wrap: break-word;">"10 s"</td>
             <td>Timeout used for all futures and blocking Akka calls. If Flink 
fails due to timeouts then you should try to increase this value. Timeouts can 
be caused by slow machines or a congested network. The timeout value requires a 
time-unit specifier (ms/s/min/h/d).</td>
         </tr>
+        <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-factor</h5></td>
+            <td style="word-wrap: break-word;">1.0</td>
+            <td>The pool size factor is used to determine thread pool size 
using the following formula: ceil(available processors * factor). Resulting 
size is then bounded by the pool-size-min and pool-size-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-max</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Max number of threads to cap factor-based number to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-min</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Min number of threads to cap factor-based number to.</td>
+        </tr>
         <tr>
             <td><h5>akka.client.timeout</h5></td>
             <td style="word-wrap: break-word;">"60 s"</td>
             <td>Timeout for all blocking calls on the client side.</td>
         </tr>
+        <tr>
+            <td><h5>akka.fork-join-executor.parallelism-factor</h5></td>
+            <td style="word-wrap: break-word;">2.0</td>
+            <td>The parallelism factor is used to determine thread pool size 
using the following formula: ceil(available processors * factor). Resulting 
size is then bounded by the parallelism-min and parallelism-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.fork-join-executor.parallelism-max</h5></td>
+            <td style="word-wrap: break-word;">64</td>
+            <td>Max number of threads to cap factor-based parallelism number 
to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.fork-join-executor.parallelism-min</h5></td>
+            <td style="word-wrap: break-word;">8</td>
+            <td>Min number of threads to cap factor-based parallelism number 
to.</td>
+        </tr>
         <tr>
             <td><h5>akka.framesize</h5></td>
             <td style="word-wrap: break-word;">"10485760b"</td>
@@ -42,6 +72,21 @@
             <td style="word-wrap: break-word;">50</td>
             <td>Milliseconds a gate should be closed for after a remote 
connection was disconnected.</td>
         </tr>
+        <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-factor</h5></td>
+            <td style="word-wrap: break-word;">1.0</td>
+            <td>The pool size factor is used to determine thread pool size 
using the following formula: ceil(available processors * factor). Resulting 
size is then bounded by the pool-size-min and pool-size-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-max</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Max number of threads to cap factor-based number to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-min</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Min number of threads to cap factor-based number to.</td>
+        </tr>
         <tr>
             <td><h5>akka.ssl.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 43c7876fbd8..02234b9d259 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -194,4 +194,81 @@
                .key("akka.retry-gate-closed-for")
                .defaultValue(50L)
                .withDescription("Milliseconds a gate should be closed for 
after a remote connection was disconnected.");
+
+       // ==================================================
+       // Configurations for fork-join-executor.
+       // ==================================================
+
+       public static final ConfigOption<Double> 
FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = ConfigOptions
+               .key("akka.fork-join-executor.parallelism-factor")
+               .defaultValue(2.0)
+               .withDescription(Description.builder()
+                       .text("The parallelism factor is used to determine 
thread pool size using the" +
+                               " following formula: ceil(available processors 
* factor). Resulting size" +
+                               " is then bounded by the parallelism-min and 
parallelism-max values."
+                       ).build());
+
+       public static final ConfigOption<Integer> 
FORK_JOIN_EXECUTOR_PARALLELISM_MIN = ConfigOptions
+               .key("akka.fork-join-executor.parallelism-min")
+               .defaultValue(8)
+               .withDescription(Description.builder()
+                       .text("Min number of threads to cap factor-based 
parallelism number to.").build());
+
+       public static final ConfigOption<Integer> 
FORK_JOIN_EXECUTOR_PARALLELISM_MAX = ConfigOptions
+               .key("akka.fork-join-executor.parallelism-max")
+               .defaultValue(64)
+               .withDescription(Description.builder()
+                       .text("Max number of threads to cap factor-based 
parallelism number to.").build());
+
+       // ==================================================
+       // Configurations for client-socket-work-pool.
+       // ==================================================
+
+       public static final ConfigOption<Integer> 
CLIENT_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
+               .key("akka.client-socket-worker-pool.pool-size-min")
+               .defaultValue(1)
+               .withDescription(Description.builder()
+                       .text("Min number of threads to cap factor-based number 
to.").build());
+
+       public static final ConfigOption<Integer> 
CLIENT_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
+               .key("akka.client-socket-worker-pool.pool-size-max")
+               .defaultValue(2)
+               .withDescription(Description.builder()
+                       .text("Max number of threads to cap factor-based number 
to.").build());
+
+       public static final ConfigOption<Double> 
CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
+               .key("akka.client-socket-worker-pool.pool-size-factor")
+               .defaultValue(1.0)
+               .withDescription(Description.builder()
+                       .text("The pool size factor is used to determine thread 
pool size" +
+                               " using the following formula: ceil(available 
processors * factor)." +
+                               " Resulting size is then bounded by the 
pool-size-min and" +
+                               " pool-size-max values."
+                       ).build());
+
+       // ==================================================
+       // Configurations for server-socket-work-pool.
+       // ==================================================
+
+       public static final ConfigOption<Integer> 
SERVER_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
+               .key("akka.server-socket-worker-pool.pool-size-min")
+               .defaultValue(1)
+               .withDescription(Description.builder()
+                       .text("Min number of threads to cap factor-based number 
to.").build());
+
+       public static final ConfigOption<Integer> 
SERVER_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
+               .key("akka.server-socket-worker-pool.pool-size-max")
+               .defaultValue(2)
+               .withDescription(Description.builder()
+                       .text("Max number of threads to cap factor-based number 
to.").build());
+
+       public static final ConfigOption<Double> 
SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
+               .key("akka.server-socket-worker-pool.pool-size-factor")
+               .defaultValue(1.0)
+               .withDescription(Description.builder()
+                       .text("The pool size factor is used to determine thread 
pool size" +
+                               " using the following formula: ceil(available 
processors * factor)." +
+                               " Resulting size is then bounded by the 
pool-size-min and" +
+                               " pool-size-max values."
+                       ).build());
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 4e43480f1ec..56e45762263 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -104,14 +104,7 @@ public static ActorSystem startActorSystem(
                while (portsIterator.hasNext()) {
                        // first, we check if the port is available by opening 
a socket
                        // if the actor system fails to start on the port, we 
try further
-                       ServerSocket availableSocket = 
NetUtils.createSocketFromPorts(
-                               portsIterator,
-                               new NetUtils.SocketFactory() {
-                                       @Override
-                                       public ServerSocket createSocket(int 
port) throws IOException {
-                                               return new ServerSocket(port);
-                                       }
-                               });
+                       ServerSocket availableSocket = 
NetUtils.createSocketFromPorts(portsIterator, ServerSocket::new);
 
                        int port;
                        if (availableSocket == null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c47f4fd19ff..01cb2b6b099 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1655,4 +1655,9 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
        RestartStrategy getRestartStrategy() {
                return restartStrategy;
        }
+
+       @VisibleForTesting
+       ExecutionGraph getExecutionGraph() {
+               return executionGraph;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 052b9b121ed..2e9de4c168d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -69,7 +70,6 @@
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,7 +121,7 @@ protected void initChannel(SocketChannel socketChannel) {
                                        .addLast(new ClientHandler());
                        }
                };
-               NioEventLoopGroup group = new NioEventLoopGroup(1, new 
DefaultThreadFactory("flink-rest-client-netty"));
+               NioEventLoopGroup group = new NioEventLoopGroup(1, new 
ExecutorThreadFactory("flink-rest-client-netty"));
 
                bootstrap = new Bootstrap();
                bootstrap
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index b1169b785a5..e836e357b5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
 
@@ -43,7 +44,6 @@
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,8 +169,8 @@ protected void initChannel(SocketChannel ch) {
                                }
                        };
 
-                       NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, 
new DefaultThreadFactory("flink-rest-server-netty-boss"));
-                       NioEventLoopGroup workerGroup = new 
NioEventLoopGroup(0, new 
DefaultThreadFactory("flink-rest-server-netty-worker"));
+                       NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, 
new ExecutorThreadFactory("flink-rest-server-netty-boss"));
+                       NioEventLoopGroup workerGroup = new 
NioEventLoopGroup(0, new 
ExecutorThreadFactory("flink-rest-server-netty-worker"));
 
                        bootstrap = new ServerBootstrap();
                        bootstrap
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 982a53668c5..3a626986361 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -52,7 +52,7 @@
        private static final Logger LOG = 
LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
 
        private static final String AKKA_TCP = "akka.tcp";
-       private static final String AkKA_SSL_TCP = "akka.ssl.tcp";
+       private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
 
        private static final AtomicLong nextNameOffset = new AtomicLong(0L);
 
@@ -162,7 +162,7 @@ public static String getRpcUrl(
                checkNotNull(endpointName, "endpointName is null");
                checkArgument(port > 0 && port <= 65535, "port must be in [1, 
65535]");
 
-               final String protocolPrefix = akkaProtocol == 
AkkaProtocol.SSL_TCP ? AkKA_SSL_TCP : AKKA_TCP;
+               final String protocolPrefix = akkaProtocol == 
AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
 
                if (addressResolution == 
AddressResolution.TRY_ADDRESS_RESOLUTION) {
                        // Fail fast if the hostname cannot be resolved
diff --git a/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala 
b/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
new file mode 100644
index 00000000000..14ab51b8735
--- /dev/null
+++ b/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import akka.actor.ActorSystem.findClassLoader
+import akka.actor.setup.ActorSystemSetup
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.flink.runtime.util.FatalExitExceptionHandler
+
+import scala.concurrent.ExecutionContext
+
+/**
+  * [[ActorSystemImpl]] which has a configurable 
[[java.lang.Thread.UncaughtExceptionHandler]].
+  */
+class RobustActorSystem(
+    name: String,
+    applicationConfig: Config,
+    classLoader: ClassLoader,
+    defaultExecutionContext: Option[ExecutionContext],
+    guardianProps: Option[Props],
+    setup: ActorSystemSetup,
+    val optionalUncaughtExceptionHandler: Option[UncaughtExceptionHandler])
+    extends ActorSystemImpl(
+      name,
+      applicationConfig,
+      classLoader,
+      defaultExecutionContext,
+      guardianProps,
+      setup) {
+
+  override protected def uncaughtExceptionHandler: 
Thread.UncaughtExceptionHandler =
+    optionalUncaughtExceptionHandler.getOrElse(super.uncaughtExceptionHandler)
+}
+
+object RobustActorSystem {
+  def create(name: String, applicationConfig: Config): RobustActorSystem = {
+    apply(name, ActorSystemSetup.create(BootstrapSetup(None, 
Option(applicationConfig), None)))
+  }
+
+  def create(
+      name: String,
+      applicationConfig: Config,
+      uncaughtExceptionHandler: UncaughtExceptionHandler): RobustActorSystem = 
{
+    apply(
+      name,
+      ActorSystemSetup.create(BootstrapSetup(None, Option(applicationConfig), 
None)),
+      uncaughtExceptionHandler
+    )
+  }
+
+  def apply(name: String, setup: ActorSystemSetup): RobustActorSystem = {
+    internalApply(name, setup, Some(FatalExitExceptionHandler.INSTANCE))
+  }
+
+  def apply(
+      name: String,
+      setup: ActorSystemSetup,
+      uncaughtExceptionHandler: UncaughtExceptionHandler): RobustActorSystem = 
{
+    internalApply(name, setup, Some(uncaughtExceptionHandler))
+  }
+
+  def internalApply(
+      name: String,
+      setup: ActorSystemSetup,
+      uncaughtExceptionHandler: Option[UncaughtExceptionHandler]): 
RobustActorSystem = {
+    val bootstrapSettings = setup.get[BootstrapSetup]
+    val cl = 
bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
+    val appConfig = 
bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
+    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)
+
+    new RobustActorSystem(
+      name,
+      appConfig,
+      cl,
+      defaultEC,
+      None,
+      setup,
+      uncaughtExceptionHandler).start()
+  }
+}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b58bfe13ff4..9ce1865204f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
 import org.jboss.netty.channel.ChannelException
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
 
 import scala.annotation.tailrec
 import scala.concurrent._
@@ -44,9 +44,9 @@ import scala.language.postfixOps
  * actor systems resides in this class.
  */
 object AkkaUtils {
-  val LOG = LoggerFactory.getLogger(AkkaUtils.getClass)
+  val LOG: Logger = LoggerFactory.getLogger(AkkaUtils.getClass)
 
-  val INF_TIMEOUT = 21474835 seconds
+  val INF_TIMEOUT: FiniteDuration = 21474835 seconds
 
   /**
    * Creates a local actor system without remoting.
@@ -103,7 +103,7 @@ object AkkaUtils {
   def createActorSystem(akkaConfig: Config): ActorSystem = {
     // Initialize slf4j as logger of Akka's Netty instead of java.util.logging 
(FLINK-1650)
     InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
-    ActorSystem.create("flink", akkaConfig)
+    RobustActorSystem.create("flink", akkaConfig)
   }
 
   /**
@@ -124,7 +124,9 @@ object AkkaUtils {
     * @param port to bind against
     * @return A remote Akka config
     */
-  def getAkkaConfig(configuration: Configuration, hostname: String, port: 
Int): Config = {
+  def getAkkaConfig(configuration: Configuration,
+                    hostname: String,
+                    port: Int): Config = {
     getAkkaConfig(configuration, Some((hostname, port)))
   }
 
@@ -203,6 +205,24 @@ object AkkaUtils {
     val supervisorStrategy = 
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
       .getCanonicalName
 
+    val forkJoinExecutorParallelismFactor =
+      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+    val forkJoinExecutorParallelismMin =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+    val forkJoinExecutorParallelismMax =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+    val forkJoinExecutorConfig =
+      s"""
+         | fork-join-executor {
+         |   parallelism-factor = $forkJoinExecutorParallelismFactor
+         |   parallelism-min = $forkJoinExecutorParallelismMin
+         |   parallelism-max = $forkJoinExecutorParallelismMax
+         | }
+       """.stripMargin
+
     val config =
       s"""
         |akka {
@@ -230,9 +250,7 @@ object AkkaUtils {
         |   default-dispatcher {
         |     throughput = $akkaThroughput
         |
-        |     fork-join-executor {
-        |       parallelism-factor = 2.0
-        |     }
+        |   $forkJoinExecutorConfig
         |   }
         | }
         |}
@@ -263,7 +281,7 @@ object AkkaUtils {
   private def validateHeartbeat(pauseParamName: String,
                                 pauseValue: String,
                                 intervalParamName: String,
-                                intervalValue: String) = {
+                                intervalValue: String): Unit = {
     if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) {
       throw new IllegalConfigurationException(
         "%s [%s] must greater then %s [%s]",
@@ -367,6 +385,25 @@ object AkkaUtils {
     val akkaSSLAlgorithmsString = 
configuration.getString(SecurityOptions.SSL_ALGORITHMS)
     val akkaSSLAlgorithms = 
akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]")
 
+    val clientSocketWorkerPoolPoolSizeMin =
+      configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN)
+
+    val clientSocketWorkerPoolPoolSizeMax =
+      configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX)
+
+    val clientSocketWorkerPoolPoolSizeFactor =
+      
configuration.getDouble(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR)
+
+    val serverSocketWorkerPoolPoolSizeMin =
+      configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN)
+
+    val serverSocketWorkerPoolPoolSizeMax =
+      configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX)
+
+    val serverSocketWorkerPoolPoolSizeFactor =
+      
configuration.getDouble(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR)
+
+
     val configString =
       s"""
          |akka {
@@ -397,6 +434,18 @@ object AkkaUtils {
          |        connection-timeout = $akkaTCPTimeout
          |        maximum-frame-size = $akkaFramesize
          |        tcp-nodelay = on
+         |
+         |        client-socket-worker-pool {
+         |          pool-size-min = $clientSocketWorkerPoolPoolSizeMin
+         |          pool-size-max = $clientSocketWorkerPoolPoolSizeMax
+         |          pool-size-factor = $clientSocketWorkerPoolPoolSizeFactor
+         |        }
+         |
+         |        server-socket-worker-pool {
+         |          pool-size-min = $serverSocketWorkerPoolPoolSizeMin
+         |          pool-size-max = $serverSocketWorkerPoolPoolSizeMax
+         |          pool-size-factor = $serverSocketWorkerPoolPoolSizeFactor
+         |        }
          |      }
          |    }
          |
@@ -790,7 +839,7 @@ object AkkaUtils {
           retryOnBindException(fn, stopCond)
         }
       case scala.util.Failure(x: Exception) => x.getCause match {
-        case c: ChannelException =>
+        case _: ChannelException =>
           if (stopCond) {
             scala.util.Failure(new RuntimeException(
               "Unable to do further retries starting the actor system"))
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index afecae20bef..0988730689a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2143,7 +2143,6 @@ object JobManager {
       configuration: Configuration,
       externalHostname: String,
       port: Int): ActorSystem = {
-
     // Bring up the job manager actor system first, bind it to the given 
address.
     val jobManagerSystem = BootstrapTools.startActorSystem(
       configuration,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 00a84755daa..2d39e0b972d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -21,11 +21,13 @@
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.Props;
+import akka.actor.RobustActorSystem;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.junit.AfterClass;
@@ -40,13 +42,13 @@
 /**
  * Tests for {@link FlinkUntypedActor}.
  */
-public class FlinkUntypedActorTest {
+public class FlinkUntypedActorTest extends TestLogger {
 
        private static ActorSystem actorSystem;
 
        @BeforeClass
        public static void setup() {
-               actorSystem = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
+               actorSystem = RobustActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
        }
 
        @AfterClass
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 1d36fa5859a..0d603fc17b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -97,13 +97,12 @@
        // 
------------------------------------------------------------------------
 
        /**
-        * Waits until the job has reached a certain state.
+        * Waits until the Job has reached a certain state.
         *
         * <p>This method is based on polling and might miss very fast state 
transitions!
         */
        public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus 
status, long maxWaitMillis)
                        throws TimeoutException {
-
                checkNotNull(eg);
                checkNotNull(status);
                checkArgument(maxWaitMillis >= 0);
@@ -118,7 +117,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, 
JobStatus status, long
                }
 
                if (System.nanoTime() >= deadline) {
-                       throw new TimeoutException("The job did not reach 
status " + status + " in time. Current status is " + eg.getState() + '.');
+                       throw new TimeoutException(
+                               String.format("The job did not reach status %s 
in time. Current status is %s.",
+                                       status, eg.getState()));
                }
        }
 
@@ -129,7 +130,6 @@ public static void waitUntilJobStatus(ExecutionGraph eg, 
JobStatus status, long
         */
        public static void waitUntilExecutionState(Execution execution, 
ExecutionState state, long maxWaitMillis)
                        throws TimeoutException {
-
                checkNotNull(execution);
                checkNotNull(state);
                checkArgument(maxWaitMillis >= 0);
@@ -144,7 +144,47 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
                }
 
                if (System.nanoTime() >= deadline) {
-                       throw new TimeoutException();
+                       throw new TimeoutException(
+                               String.format("The execution did not reach 
state %s in time. Current state is %s.",
+                                       state, execution.getState()));
+               }
+       }
+
+       /**
+        * Waits until the ExecutionVertex has reached a certain state.
+        *
+        * <p>This method is based on polling and might miss very fast state 
transitions!
+        */
+       public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+               throws TimeoutException {
+               checkNotNull(executionVertex);
+               checkNotNull(state);
+               checkArgument(maxWaitMillis >= 0);
+
+               // this is a poor implementation - we may want to improve it 
eventually
+               final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : 
System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+               while (true) {
+                       Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+                       if (execution == null || (execution.getState() != state 
&& System.nanoTime() < deadline)) {
+                               try {
+                                       Thread.sleep(2);
+                               } catch (InterruptedException ignored) { }
+                       } else {
+                               break;
+                       }
+
+                       if (System.nanoTime() >= deadline) {
+                               if (execution != null) {
+                                       throw new TimeoutException(
+                                               String.format("The execution 
vertex did not reach state %s in time. Current state is %s.",
+                                                       state, 
execution.getState()));
+                               } else {
+                                       throw new TimeoutException(
+                                               "Cannot get current execution 
attempt of " + executionVertex + '.');
+                               }
+                       }
                }
        }
 
@@ -201,7 +241,6 @@ public static void waitForAllExecutionsPredicate(
 
        public static void waitUntilFailoverRegionState(FailoverRegion region, 
JobStatus status, long maxWaitMillis)
                        throws TimeoutException {
-
                checkNotNull(region);
                checkNotNull(status);
                checkArgument(maxWaitMillis >= 0);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 5d7a520ae10..880be52df86 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import akka.actor.ActorSystem;
+import akka.actor.RobustActorSystem;
 import akka.testkit.JavaTestKit;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -26,6 +27,7 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -47,7 +49,7 @@
 /**
  * Tests for {@link org.apache.flink.runtime.instance.InstanceManager}.
  */
-public class InstanceManagerTest{
+public class InstanceManagerTest extends TestLogger {
 
        static ActorSystem system;
 
@@ -55,7 +57,7 @@
 
        @BeforeClass
        public static void setup(){
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
+               system = RobustActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
        }
 
        @AfterClass
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 891ff82c413..66ca769165a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -20,12 +20,17 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -46,6 +51,9 @@
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -60,6 +68,7 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -84,8 +93,11 @@
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
 
+import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -100,8 +112,12 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -110,9 +126,11 @@
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -121,6 +139,8 @@
  */
 public class JobMasterTest extends TestLogger {
 
+       static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new 
TestingInputSplit[0];
+
        @ClassRule
        public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@@ -678,6 +698,152 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
                }
        }
 
+       @Test
+       public void testRequestNextInputSplit() throws Exception {
+               final List<TestingInputSplit> expectedInputSplits = 
Arrays.asList(
+                       new TestingInputSplit(1),
+                       new TestingInputSplit(42),
+                       new TestingInputSplit(1337));
+
+               // build one node JobGraph
+               InputSplitSource<TestingInputSplit> inputSplitSource = new 
TestingInputSplitSource(expectedInputSplits);
+
+               JobVertex source = new JobVertex("vertex1");
+               source.setParallelism(1);
+               source.setInputSplitSource(inputSplitSource);
+               source.setInvokableClass(AbstractInvokable.class);
+
+               final JobGraph testJobGraph = new JobGraph(source);
+               testJobGraph.setAllowQueuedScheduling(true);
+
+               
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+               
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+               final JobManagerSharedServices jobManagerSharedServices =
+                       new TestingJobManagerSharedServicesBuilder()
+                               
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+                               .build();
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       testJobGraph,
+                       haServices,
+                       jobManagerSharedServices);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       // wait for the start to complete
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       ExecutionGraph eg = jobMaster.getExecutionGraph();
+                       ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+                       final SupplierWithException<SerializedInputSplit, 
Exception> inputSplitSupplier = () -> jobMasterGateway.requestNextInputSplit(
+                               source.getID(),
+                               
ev.getCurrentExecutionAttempt().getAttemptId()).get();
+
+                       List<InputSplit> actualInputSplits = getInputSplits(
+                               expectedInputSplits.size(),
+                               inputSplitSupplier);
+
+                       final Matcher<Iterable<? extends InputSplit>> 
expectedInputSplitsMatcher = 
containsInAnyOrder(expectedInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS));
+                       assertThat(actualInputSplits, 
expectedInputSplitsMatcher);
+
+                       final long maxWaitMillis = 2000L;
+                       
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, maxWaitMillis);
+
+                       eg.failGlobal(new Exception("Testing exception"));
+
+                       
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, maxWaitMillis);
+
+                       actualInputSplits = getInputSplits(
+                               expectedInputSplits.size(),
+                               inputSplitSupplier);
+
+                       assertThat(actualInputSplits, 
expectedInputSplitsMatcher);
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       @Nonnull
+       private static List<InputSplit> getInputSplits(int numberInputSplits, 
SupplierWithException<SerializedInputSplit, Exception> nextInputSplit) throws 
Exception {
+               final List<InputSplit> actualInputSplits = new 
ArrayList<>(numberInputSplits);
+
+               for (int i = 0; i < numberInputSplits; i++) {
+                       final SerializedInputSplit serializedInputSplit = 
nextInputSplit.get();
+
+                       assertThat(serializedInputSplit.isEmpty(), is(false));
+
+                       
actualInputSplits.add(InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(),
 ClassLoader.getSystemClassLoader()));
+               }
+
+               final SerializedInputSplit serializedInputSplit = 
nextInputSplit.get();
+
+               if (!serializedInputSplit.isEmpty()) {
+                       InputSplit emptyInputSplit = 
InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+
+                       assertThat(emptyInputSplit, is(nullValue()));
+               }
+               return actualInputSplits;
+       }
+
+       private static final class TestingInputSplitSource implements 
InputSplitSource<TestingInputSplit> {
+               private static final long serialVersionUID = 
-2344684048759139086L;
+
+               private final List<TestingInputSplit> inputSplits;
+
+               private TestingInputSplitSource(List<TestingInputSplit> 
inputSplits) {
+                       this.inputSplits = inputSplits;
+               }
+
+               @Override
+               public TestingInputSplit[] createInputSplits(int minNumSplits) {
+                       return inputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS);
+               }
+
+               @Override
+               public InputSplitAssigner 
getInputSplitAssigner(TestingInputSplit[] inputSplits) {
+                       return new DefaultInputSplitAssigner(inputSplits);
+               }
+       }
+
+       private static final class TestingInputSplit implements InputSplit {
+
+               private static final long serialVersionUID = 
-5404803705463116083L;
+               private final int splitNumber;
+
+               TestingInputSplit(int number) {
+                       this.splitNumber = number;
+               }
+
+               public int getSplitNumber() {
+                       return splitNumber;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       TestingInputSplit that = (TestingInputSplit) o;
+                       return splitNumber == that.splitNumber;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(splitNumber);
+               }
+       }
+
        /**
         * Tests the {@link 
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
         * call for a finished result partition.
@@ -708,9 +874,9 @@ public void testRequestPartitionState() throws Exception {
                        final CompletableFuture<TaskDeploymentDescriptor> 
tddFuture = new CompletableFuture<>();
                        final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
                                
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
-                    tddFuture.complete(taskDeploymentDescriptor);
-                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                })
+                                         
tddFuture.complete(taskDeploymentDescriptor);
+                                         return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                 })
                                .createTestingTaskExecutorGateway();
                        
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), 
testingTaskExecutorGateway);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 703cd0bf085..3b502db0d7d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -33,8 +33,6 @@
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -47,6 +45,7 @@
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.RobustActorSystem;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
@@ -78,7 +77,7 @@
 
        @BeforeClass
        public static void setup() throws Exception {
-               actorSystem = ActorSystem.create("TestingActorSystem");
+               actorSystem = RobustActorSystem.create("TestingActorSystem", 
TestingUtils.getDefaultTestingActorSystemConfig());
                testingServer = new TestingServer();
        }
 
diff --git 
a/flink-runtime/src/test/scala/akka/actor/RobustActorSystemTest.scala 
b/flink-runtime/src/test/scala/akka/actor/RobustActorSystemTest.scala
new file mode 100644
index 00000000000..b8765375a8d
--- /dev/null
+++ b/flink-runtime/src/test/scala/akka/actor/RobustActorSystemTest.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.junit.{After, Before, Test}
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitSuite
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future, Promise}
+import scala.util.Success
+
+class RobustActorSystemTest extends JUnitSuite with Matchers {
+
+  var robustActorSystem: RobustActorSystem = null
+  var testingUncaughtExceptionHandler: TestingUncaughtExceptionHandler = null
+
+  @Before
+  def setup(): Unit = {
+    testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler
+    robustActorSystem = RobustActorSystem.create(
+      "testSystem",
+      AkkaUtils.testDispatcherConfig,
+      testingUncaughtExceptionHandler)
+  }
+
+  @After
+  def teardown(): Unit = {
+    robustActorSystem.terminate()
+    testingUncaughtExceptionHandler = null;
+  }
+
+  @Test
+  def testUncaughtExceptionHandler(): Unit = {
+    val error = new UnknownError("Foobar")
+
+    Future {
+      throw error
+    }(robustActorSystem.dispatcher)
+
+    val caughtException = Await.result(
+      testingUncaughtExceptionHandler.exceptionPromise.future,
+      Duration.Inf)
+
+    caughtException should equal (error)
+  }
+
+  @Test
+  def testUncaughtExceptionHandlerFromActor(): Unit = {
+    val error = new UnknownError()
+    val actor = 
robustActorSystem.actorOf(Props.create(classOf[UncaughtExceptionActor], error))
+
+    actor ! Failure
+
+    val caughtException = Await.result(
+      testingUncaughtExceptionHandler.exceptionPromise.future,
+      Duration.Inf)
+
+    caughtException should equal (error)
+  }
+}
+
+class TestingUncaughtExceptionHandler extends UncaughtExceptionHandler {
+  val exceptionPromise: Promise[Throwable] = Promise()
+
+  override def uncaughtException(t: Thread, e: Throwable): Unit = {
+    exceptionPromise.complete(Success(e))
+  }
+}
+
+class UncaughtExceptionActor(failure: Throwable) extends Actor {
+  override def receive: Receive = {
+    case Failure => {
+      throw failure
+    };
+  }
+}
+
+case object Failure
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index a22a8a8c930..afd50881754 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -71,6 +71,7 @@
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
+import akka.actor.RobustActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
@@ -351,7 +352,7 @@ private void 
testCheckpointedStreamingProgram(AbstractStateBackend stateBackend)
                final int sequenceEnd = 5000;
                final long expectedSum = Parallelism * sequenceEnd * 
(sequenceEnd + 1) / 2;
 
-               final ActorSystem system = ActorSystem.create("Test", 
AkkaUtils.getDefaultAkkaConfig());
+               final ActorSystem system = RobustActorSystem.create("Test", 
AkkaUtils.getDefaultAkkaConfig());
                final TestingServer testingServer = new TestingServer();
                final TemporaryFolder temporaryFolder = new TemporaryFolder();
                temporaryFolder.create();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 00c6865300c..66a811a6379 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -31,6 +31,7 @@
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
+import akka.actor.RobustActorSystem;
 import akka.testkit.JavaTestKit;
 import org.junit.Test;
 
@@ -67,7 +68,7 @@
        @Test
        public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws 
InterruptedException, TimeoutException {
 
-               final ActorSystem system = ActorSystem.create("Testkit", 
AkkaUtils.getDefaultAkkaConfig());
+               final ActorSystem system = RobustActorSystem.create("Testkit", 
AkkaUtils.getDefaultAkkaConfig());
                LocalFlinkMiniCluster miniCluster = null;
 
                final int numTMs = 3;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to