asfgit closed pull request #6539: [FLINK-10123] Use ExecutorThreadFactory
instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/_includes/generated/akka_configuration.html
b/docs/_includes/generated/akka_configuration.html
index 352c656488e..f5a2a5d77f9 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -12,11 +12,41 @@
<td style="word-wrap: break-word;">"10 s"</td>
<td>Timeout used for all futures and blocking Akka calls. If Flink
fails due to timeouts then you should try to increase this value. Timeouts can
be caused by slow machines or a congested network. The timeout value requires a
time-unit specifier (ms/s/min/h/d).</td>
</tr>
+ <tr>
+ <td><h5>akka.client-socket-worker-pool.pool-size-factor</h5></td>
+ <td style="word-wrap: break-word;">1.0</td>
+ <td>The pool size factor is used to determine thread pool size
using the following formula: ceil(available processors * factor). Resulting
size is then bounded by the pool-size-min and pool-size-max values.</td>
+ </tr>
+ <tr>
+ <td><h5>akka.client-socket-worker-pool.pool-size-max</h5></td>
+ <td style="word-wrap: break-word;">2</td>
+ <td>Max number of threads to cap factor-based number to.</td>
+ </tr>
+ <tr>
+ <td><h5>akka.client-socket-worker-pool.pool-size-min</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Min number of threads to cap factor-based number to.</td>
+ </tr>
<tr>
<td><h5>akka.client.timeout</h5></td>
<td style="word-wrap: break-word;">"60 s"</td>
<td>Timeout for all blocking calls on the client side.</td>
</tr>
+ <tr>
+ <td><h5>akka.fork-join-executor.parallelism-factor</h5></td>
+ <td style="word-wrap: break-word;">2.0</td>
+ <td>The parallelism factor is used to determine thread pool size
using the following formula: ceil(available processors * factor). Resulting
size is then bounded by the parallelism-min and parallelism-max values.</td>
+ </tr>
+ <tr>
+ <td><h5>akka.fork-join-executor.parallelism-max</h5></td>
+ <td style="word-wrap: break-word;">64</td>
+ <td>Max number of threads to cap factor-based parallelism number
to.</td>
+ </tr>
+ <tr>
+ <td><h5>akka.fork-join-executor.parallelism-min</h5></td>
+ <td style="word-wrap: break-word;">8</td>
+ <td>Min number of threads to cap factor-based parallelism number
to.</td>
+ </tr>
<tr>
<td><h5>akka.framesize</h5></td>
<td style="word-wrap: break-word;">"10485760b"</td>
@@ -42,6 +72,21 @@
<td style="word-wrap: break-word;">50</td>
<td>Milliseconds a gate should be closed for after a remote
connection was disconnected.</td>
</tr>
+ <tr>
+ <td><h5>akka.server-socket-worker-pool.pool-size-factor</h5></td>
+ <td style="word-wrap: break-word;">1.0</td>
+ <td>The pool size factor is used to determine thread pool size
using the following formula: ceil(available processors * factor). Resulting
size is then bounded by the pool-size-min and pool-size-max values.</td>
+ </tr>
+ <tr>
+ <td><h5>akka.server-socket-worker-pool.pool-size-max</h5></td>
+ <td style="word-wrap: break-word;">2</td>
+ <td>Max number of threads to cap factor-based number to.</td>
+ </tr>
+ <tr>
+ <td><h5>akka.server-socket-worker-pool.pool-size-min</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Min number of threads to cap factor-based number to.</td>
+ </tr>
<tr>
<td><h5>akka.ssl.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 43c7876fbd8..02234b9d259 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -194,4 +194,81 @@
.key("akka.retry-gate-closed-for")
.defaultValue(50L)
.withDescription("Milliseconds a gate should be closed for
after a remote connection was disconnected.");
+
+ // ==================================================
+ // Configurations for fork-join-executor.
+ // ==================================================
+
+ public static final ConfigOption<Double>
FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = ConfigOptions
+ .key("akka.fork-join-executor.parallelism-factor")
+ .defaultValue(2.0)
+ .withDescription(Description.builder()
+ .text("The parallelism factor is used to determine
thread pool size using the" +
+ " following formula: ceil(available processors
* factor). Resulting size" +
+ " is then bounded by the parallelism-min and
parallelism-max values."
+ ).build());
+
+ public static final ConfigOption<Integer>
FORK_JOIN_EXECUTOR_PARALLELISM_MIN = ConfigOptions
+ .key("akka.fork-join-executor.parallelism-min")
+ .defaultValue(8)
+ .withDescription(Description.builder()
+ .text("Min number of threads to cap factor-based
parallelism number to.").build());
+
+ public static final ConfigOption<Integer>
FORK_JOIN_EXECUTOR_PARALLELISM_MAX = ConfigOptions
+ .key("akka.fork-join-executor.parallelism-max")
+ .defaultValue(64)
+ .withDescription(Description.builder()
+ .text("Max number of threads to cap factor-based
parallelism number to.").build());
+
+ // ==================================================
+ // Configurations for client-socket-work-pool.
+ // ==================================================
+
+ public static final ConfigOption<Integer>
CLIENT_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
+ .key("akka.client-socket-worker-pool.pool-size-min")
+ .defaultValue(1)
+ .withDescription(Description.builder()
+ .text("Min number of threads to cap factor-based number
to.").build());
+
+ public static final ConfigOption<Integer>
CLIENT_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
+ .key("akka.client-socket-worker-pool.pool-size-max")
+ .defaultValue(2)
+ .withDescription(Description.builder()
+ .text("Max number of threads to cap factor-based number
to.").build());
+
+ public static final ConfigOption<Double>
CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
+ .key("akka.client-socket-worker-pool.pool-size-factor")
+ .defaultValue(1.0)
+ .withDescription(Description.builder()
+ .text("The pool size factor is used to determine thread
pool size" +
+ " using the following formula: ceil(available
processors * factor)." +
+ " Resulting size is then bounded by the
pool-size-min and" +
+ " pool-size-max values."
+ ).build());
+
+ // ==================================================
+ // Configurations for server-socket-work-pool.
+ // ==================================================
+
+ public static final ConfigOption<Integer>
SERVER_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
+ .key("akka.server-socket-worker-pool.pool-size-min")
+ .defaultValue(1)
+ .withDescription(Description.builder()
+ .text("Min number of threads to cap factor-based number
to.").build());
+
+ public static final ConfigOption<Integer>
SERVER_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
+ .key("akka.server-socket-worker-pool.pool-size-max")
+ .defaultValue(2)
+ .withDescription(Description.builder()
+ .text("Max number of threads to cap factor-based number
to.").build());
+
+ public static final ConfigOption<Double>
SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
+ .key("akka.server-socket-worker-pool.pool-size-factor")
+ .defaultValue(1.0)
+ .withDescription(Description.builder()
+ .text("The pool size factor is used to determine thread
pool size" +
+ " using the following formula: ceil(available
processors * factor)." +
+ " Resulting size is then bounded by the
pool-size-min and" +
+ " pool-size-max values."
+ ).build());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 4e43480f1ec..56e45762263 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -104,14 +104,7 @@ public static ActorSystem startActorSystem(
while (portsIterator.hasNext()) {
// first, we check if the port is available by opening
a socket
// if the actor system fails to start on the port, we
try further
- ServerSocket availableSocket =
NetUtils.createSocketFromPorts(
- portsIterator,
- new NetUtils.SocketFactory() {
- @Override
- public ServerSocket createSocket(int
port) throws IOException {
- return new ServerSocket(port);
- }
- });
+ ServerSocket availableSocket =
NetUtils.createSocketFromPorts(portsIterator, ServerSocket::new);
int port;
if (availableSocket == null) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c47f4fd19ff..01cb2b6b099 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1655,4 +1655,9 @@ public void reportPayload(ResourceID resourceID, Void
payload) {
RestartStrategy getRestartStrategy() {
return restartStrategy;
}
+
+ @VisibleForTesting
+ ExecutionGraph getExecutionGraph() {
+ return executionGraph;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 052b9b121ed..2e9de4c168d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -69,7 +70,6 @@
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-import
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,7 +121,7 @@ protected void initChannel(SocketChannel socketChannel) {
.addLast(new ClientHandler());
}
};
- NioEventLoopGroup group = new NioEventLoopGroup(1, new
DefaultThreadFactory("flink-rest-client-netty"));
+ NioEventLoopGroup group = new NioEventLoopGroup(1, new
ExecutorThreadFactory("flink-rest-client-netty"));
bootstrap = new Bootstrap();
bootstrap
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index b1169b785a5..e836e357b5d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -27,6 +27,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
@@ -43,7 +44,6 @@
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-import
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,8 +169,8 @@ protected void initChannel(SocketChannel ch) {
}
};
- NioEventLoopGroup bossGroup = new NioEventLoopGroup(1,
new DefaultThreadFactory("flink-rest-server-netty-boss"));
- NioEventLoopGroup workerGroup = new
NioEventLoopGroup(0, new
DefaultThreadFactory("flink-rest-server-netty-worker"));
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1,
new ExecutorThreadFactory("flink-rest-server-netty-boss"));
+ NioEventLoopGroup workerGroup = new
NioEventLoopGroup(0, new
ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap = new ServerBootstrap();
bootstrap
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 982a53668c5..3a626986361 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -52,7 +52,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
private static final String AKKA_TCP = "akka.tcp";
- private static final String AkKA_SSL_TCP = "akka.ssl.tcp";
+ private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
private static final AtomicLong nextNameOffset = new AtomicLong(0L);
@@ -162,7 +162,7 @@ public static String getRpcUrl(
checkNotNull(endpointName, "endpointName is null");
checkArgument(port > 0 && port <= 65535, "port must be in [1,
65535]");
- final String protocolPrefix = akkaProtocol ==
AkkaProtocol.SSL_TCP ? AkKA_SSL_TCP : AKKA_TCP;
+ final String protocolPrefix = akkaProtocol ==
AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
if (addressResolution ==
AddressResolution.TRY_ADDRESS_RESOLUTION) {
// Fail fast if the hostname cannot be resolved
diff --git a/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
b/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
new file mode 100644
index 00000000000..14ab51b8735
--- /dev/null
+++ b/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import akka.actor.ActorSystem.findClassLoader
+import akka.actor.setup.ActorSystemSetup
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.flink.runtime.util.FatalExitExceptionHandler
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * [[ActorSystemImpl]] which has a configurable
[[java.lang.Thread.UncaughtExceptionHandler]].
+ */
+class RobustActorSystem(
+ name: String,
+ applicationConfig: Config,
+ classLoader: ClassLoader,
+ defaultExecutionContext: Option[ExecutionContext],
+ guardianProps: Option[Props],
+ setup: ActorSystemSetup,
+ val optionalUncaughtExceptionHandler: Option[UncaughtExceptionHandler])
+ extends ActorSystemImpl(
+ name,
+ applicationConfig,
+ classLoader,
+ defaultExecutionContext,
+ guardianProps,
+ setup) {
+
+ override protected def uncaughtExceptionHandler:
Thread.UncaughtExceptionHandler =
+ optionalUncaughtExceptionHandler.getOrElse(super.uncaughtExceptionHandler)
+}
+
+object RobustActorSystem {
+ def create(name: String, applicationConfig: Config): RobustActorSystem = {
+ apply(name, ActorSystemSetup.create(BootstrapSetup(None,
Option(applicationConfig), None)))
+ }
+
+ def create(
+ name: String,
+ applicationConfig: Config,
+ uncaughtExceptionHandler: UncaughtExceptionHandler): RobustActorSystem =
{
+ apply(
+ name,
+ ActorSystemSetup.create(BootstrapSetup(None, Option(applicationConfig),
None)),
+ uncaughtExceptionHandler
+ )
+ }
+
+ def apply(name: String, setup: ActorSystemSetup): RobustActorSystem = {
+ internalApply(name, setup, Some(FatalExitExceptionHandler.INSTANCE))
+ }
+
+ def apply(
+ name: String,
+ setup: ActorSystemSetup,
+ uncaughtExceptionHandler: UncaughtExceptionHandler): RobustActorSystem =
{
+ internalApply(name, setup, Some(uncaughtExceptionHandler))
+ }
+
+ def internalApply(
+ name: String,
+ setup: ActorSystemSetup,
+ uncaughtExceptionHandler: Option[UncaughtExceptionHandler]):
RobustActorSystem = {
+ val bootstrapSettings = setup.get[BootstrapSetup]
+ val cl =
bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
+ val appConfig =
bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
+ val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)
+
+ new RobustActorSystem(
+ name,
+ appConfig,
+ cl,
+ defaultEC,
+ None,
+ setup,
+ uncaughtExceptionHandler).start()
+ }
+}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b58bfe13ff4..9ce1865204f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.net.SSLUtils
import org.apache.flink.util.NetUtils
import org.jboss.netty.channel.ChannelException
import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
import scala.annotation.tailrec
import scala.concurrent._
@@ -44,9 +44,9 @@ import scala.language.postfixOps
* actor systems resides in this class.
*/
object AkkaUtils {
- val LOG = LoggerFactory.getLogger(AkkaUtils.getClass)
+ val LOG: Logger = LoggerFactory.getLogger(AkkaUtils.getClass)
- val INF_TIMEOUT = 21474835 seconds
+ val INF_TIMEOUT: FiniteDuration = 21474835 seconds
/**
* Creates a local actor system without remoting.
@@ -103,7 +103,7 @@ object AkkaUtils {
def createActorSystem(akkaConfig: Config): ActorSystem = {
// Initialize slf4j as logger of Akka's Netty instead of java.util.logging
(FLINK-1650)
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
- ActorSystem.create("flink", akkaConfig)
+ RobustActorSystem.create("flink", akkaConfig)
}
/**
@@ -124,7 +124,9 @@ object AkkaUtils {
* @param port to bind against
* @return A remote Akka config
*/
- def getAkkaConfig(configuration: Configuration, hostname: String, port:
Int): Config = {
+ def getAkkaConfig(configuration: Configuration,
+ hostname: String,
+ port: Int): Config = {
getAkkaConfig(configuration, Some((hostname, port)))
}
@@ -203,6 +205,24 @@ object AkkaUtils {
val supervisorStrategy =
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
.getCanonicalName
+ val forkJoinExecutorParallelismFactor =
+
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+ val forkJoinExecutorParallelismMin =
+ configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+ val forkJoinExecutorParallelismMax =
+ configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+ val forkJoinExecutorConfig =
+ s"""
+ | fork-join-executor {
+ | parallelism-factor = $forkJoinExecutorParallelismFactor
+ | parallelism-min = $forkJoinExecutorParallelismMin
+ | parallelism-max = $forkJoinExecutorParallelismMax
+ | }
+ """.stripMargin
+
val config =
s"""
|akka {
@@ -230,9 +250,7 @@ object AkkaUtils {
| default-dispatcher {
| throughput = $akkaThroughput
|
- | fork-join-executor {
- | parallelism-factor = 2.0
- | }
+ | $forkJoinExecutorConfig
| }
| }
|}
@@ -263,7 +281,7 @@ object AkkaUtils {
private def validateHeartbeat(pauseParamName: String,
pauseValue: String,
intervalParamName: String,
- intervalValue: String) = {
+ intervalValue: String): Unit = {
if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) {
throw new IllegalConfigurationException(
"%s [%s] must greater then %s [%s]",
@@ -367,6 +385,25 @@ object AkkaUtils {
val akkaSSLAlgorithmsString =
configuration.getString(SecurityOptions.SSL_ALGORITHMS)
val akkaSSLAlgorithms =
akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]")
+ val clientSocketWorkerPoolPoolSizeMin =
+ configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN)
+
+ val clientSocketWorkerPoolPoolSizeMax =
+ configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX)
+
+ val clientSocketWorkerPoolPoolSizeFactor =
+
configuration.getDouble(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR)
+
+ val serverSocketWorkerPoolPoolSizeMin =
+ configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN)
+
+ val serverSocketWorkerPoolPoolSizeMax =
+ configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX)
+
+ val serverSocketWorkerPoolPoolSizeFactor =
+
configuration.getDouble(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR)
+
+
val configString =
s"""
|akka {
@@ -397,6 +434,18 @@ object AkkaUtils {
| connection-timeout = $akkaTCPTimeout
| maximum-frame-size = $akkaFramesize
| tcp-nodelay = on
+ |
+ | client-socket-worker-pool {
+ | pool-size-min = $clientSocketWorkerPoolPoolSizeMin
+ | pool-size-max = $clientSocketWorkerPoolPoolSizeMax
+ | pool-size-factor = $clientSocketWorkerPoolPoolSizeFactor
+ | }
+ |
+ | server-socket-worker-pool {
+ | pool-size-min = $serverSocketWorkerPoolPoolSizeMin
+ | pool-size-max = $serverSocketWorkerPoolPoolSizeMax
+ | pool-size-factor = $serverSocketWorkerPoolPoolSizeFactor
+ | }
| }
| }
|
@@ -790,7 +839,7 @@ object AkkaUtils {
retryOnBindException(fn, stopCond)
}
case scala.util.Failure(x: Exception) => x.getCause match {
- case c: ChannelException =>
+ case _: ChannelException =>
if (stopCond) {
scala.util.Failure(new RuntimeException(
"Unable to do further retries starting the actor system"))
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index afecae20bef..0988730689a 100644
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2143,7 +2143,6 @@ object JobManager {
configuration: Configuration,
externalHostname: String,
port: Int): ActorSystem = {
-
// Bring up the job manager actor system first, bind it to the given
address.
val jobManagerSystem = BootstrapTools.startActorSystem(
configuration,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 00a84755daa..2d39e0b972d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -21,11 +21,13 @@
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
+import akka.actor.RobustActorSystem;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.junit.AfterClass;
@@ -40,13 +42,13 @@
/**
* Tests for {@link FlinkUntypedActor}.
*/
-public class FlinkUntypedActorTest {
+public class FlinkUntypedActorTest extends TestLogger {
private static ActorSystem actorSystem;
@BeforeClass
public static void setup() {
- actorSystem = ActorSystem.create("TestingActorSystem",
TestingUtils.testConfig());
+ actorSystem = RobustActorSystem.create("TestingActorSystem",
TestingUtils.testConfig());
}
@AfterClass
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 1d36fa5859a..0d603fc17b2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -97,13 +97,12 @@
//
------------------------------------------------------------------------
/**
- * Waits until the job has reached a certain state.
+ * Waits until the Job has reached a certain state.
*
* <p>This method is based on polling and might miss very fast state
transitions!
*/
public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus
status, long maxWaitMillis)
throws TimeoutException {
-
checkNotNull(eg);
checkNotNull(status);
checkArgument(maxWaitMillis >= 0);
@@ -118,7 +117,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg,
JobStatus status, long
}
if (System.nanoTime() >= deadline) {
- throw new TimeoutException("The job did not reach
status " + status + " in time. Current status is " + eg.getState() + '.');
+ throw new TimeoutException(
+ String.format("The job did not reach status %s
in time. Current status is %s.",
+ status, eg.getState()));
}
}
@@ -129,7 +130,6 @@ public static void waitUntilJobStatus(ExecutionGraph eg,
JobStatus status, long
*/
public static void waitUntilExecutionState(Execution execution,
ExecutionState state, long maxWaitMillis)
throws TimeoutException {
-
checkNotNull(execution);
checkNotNull(state);
checkArgument(maxWaitMillis >= 0);
@@ -144,7 +144,47 @@ public static void waitUntilExecutionState(Execution
execution, ExecutionState s
}
if (System.nanoTime() >= deadline) {
- throw new TimeoutException();
+ throw new TimeoutException(
+ String.format("The execution did not reach
state %s in time. Current state is %s.",
+ state, execution.getState()));
+ }
+ }
+
+ /**
+ * Waits until the ExecutionVertex has reached a certain state.
+ *
+ * <p>This method is based on polling and might miss very fast state
transitions!
+ */
+ public static void waitUntilExecutionVertexState(ExecutionVertex
executionVertex, ExecutionState state, long maxWaitMillis)
+ throws TimeoutException {
+ checkNotNull(executionVertex);
+ checkNotNull(state);
+ checkArgument(maxWaitMillis >= 0);
+
+ // this is a poor implementation - we may want to improve it
eventually
+ final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE :
System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+ while (true) {
+ Execution execution =
executionVertex.getCurrentExecutionAttempt();
+
+ if (execution == null || (execution.getState() != state
&& System.nanoTime() < deadline)) {
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException ignored) { }
+ } else {
+ break;
+ }
+
+ if (System.nanoTime() >= deadline) {
+ if (execution != null) {
+ throw new TimeoutException(
+ String.format("The execution
vertex did not reach state %s in time. Current state is %s.",
+ state,
execution.getState()));
+ } else {
+ throw new TimeoutException(
+ "Cannot get current execution
attempt of " + executionVertex + '.');
+ }
+ }
}
}
@@ -201,7 +241,6 @@ public static void waitForAllExecutionsPredicate(
public static void waitUntilFailoverRegionState(FailoverRegion region,
JobStatus status, long maxWaitMillis)
throws TimeoutException {
-
checkNotNull(region);
checkNotNull(status);
checkArgument(maxWaitMillis >= 0);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 5d7a520ae10..880be52df86 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.instance;
import akka.actor.ActorSystem;
+import akka.actor.RobustActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -26,6 +27,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -47,7 +49,7 @@
/**
* Tests for {@link org.apache.flink.runtime.instance.InstanceManager}.
*/
-public class InstanceManagerTest{
+public class InstanceManagerTest extends TestLogger {
static ActorSystem system;
@@ -55,7 +57,7 @@
@BeforeClass
public static void setup(){
- system = ActorSystem.create("TestingActorSystem",
TestingUtils.testConfig());
+ system = RobustActorSystem.create("TestingActorSystem",
TestingUtils.testConfig());
}
@AfterClass
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 891ff82c413..66ca769165a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -20,12 +20,17 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -46,6 +51,9 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -60,6 +68,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -84,8 +93,11 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
@@ -100,8 +112,12 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -110,9 +126,11 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -121,6 +139,8 @@
*/
public class JobMasterTest extends TestLogger {
+ static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new
TestingInputSplit[0];
+
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -678,6 +698,152 @@ public void
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
+ @Test
+ public void testRequestNextInputSplit() throws Exception {
+ final List<TestingInputSplit> expectedInputSplits =
Arrays.asList(
+ new TestingInputSplit(1),
+ new TestingInputSplit(42),
+ new TestingInputSplit(1337));
+
+ // build one node JobGraph
+ InputSplitSource<TestingInputSplit> inputSplitSource = new
TestingInputSplitSource(expectedInputSplits);
+
+ JobVertex source = new JobVertex("vertex1");
+ source.setParallelism(1);
+ source.setInputSplitSource(inputSplitSource);
+ source.setInvokableClass(AbstractInvokable.class);
+
+ final JobGraph testJobGraph = new JobGraph(source);
+ testJobGraph.setAllowQueuedScheduling(true);
+
+
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0
s");
+
+ final JobManagerSharedServices jobManagerSharedServices =
+ new TestingJobManagerSharedServicesBuilder()
+
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+ .build();
+
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ testJobGraph,
+ haServices,
+ jobManagerSharedServices);
+
+ CompletableFuture<Acknowledge> startFuture =
jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ // wait for the start to complete
+ startFuture.get(testingTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
+
+ final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+ ExecutionGraph eg = jobMaster.getExecutionGraph();
+ ExecutionVertex ev =
eg.getAllExecutionVertices().iterator().next();
+
+ final SupplierWithException<SerializedInputSplit,
Exception> inputSplitSupplier = () -> jobMasterGateway.requestNextInputSplit(
+ source.getID(),
+
ev.getCurrentExecutionAttempt().getAttemptId()).get();
+
+ List<InputSplit> actualInputSplits = getInputSplits(
+ expectedInputSplits.size(),
+ inputSplitSupplier);
+
+ final Matcher<Iterable<? extends InputSplit>>
expectedInputSplitsMatcher =
containsInAnyOrder(expectedInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS));
+ assertThat(actualInputSplits,
expectedInputSplitsMatcher);
+
+ final long maxWaitMillis = 2000L;
+
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev,
ExecutionState.SCHEDULED, maxWaitMillis);
+
+ eg.failGlobal(new Exception("Testing exception"));
+
+
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev,
ExecutionState.SCHEDULED, maxWaitMillis);
+
+ actualInputSplits = getInputSplits(
+ expectedInputSplits.size(),
+ inputSplitSupplier);
+
+ assertThat(actualInputSplits,
expectedInputSplitsMatcher);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster,
testingTimeout);
+ }
+ }
+
+ @Nonnull
+ private static List<InputSplit> getInputSplits(int numberInputSplits,
SupplierWithException<SerializedInputSplit, Exception> nextInputSplit) throws
Exception {
+ final List<InputSplit> actualInputSplits = new
ArrayList<>(numberInputSplits);
+
+ for (int i = 0; i < numberInputSplits; i++) {
+ final SerializedInputSplit serializedInputSplit =
nextInputSplit.get();
+
+ assertThat(serializedInputSplit.isEmpty(), is(false));
+
+
actualInputSplits.add(InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(),
ClassLoader.getSystemClassLoader()));
+ }
+
+ final SerializedInputSplit serializedInputSplit =
nextInputSplit.get();
+
+ if (!serializedInputSplit.isEmpty()) {
+ InputSplit emptyInputSplit =
InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(),
ClassLoader.getSystemClassLoader());
+
+ assertThat(emptyInputSplit, is(nullValue()));
+ }
+ return actualInputSplits;
+ }
+
+ private static final class TestingInputSplitSource implements
InputSplitSource<TestingInputSplit> {
+ private static final long serialVersionUID =
-2344684048759139086L;
+
+ private final List<TestingInputSplit> inputSplits;
+
+ private TestingInputSplitSource(List<TestingInputSplit>
inputSplits) {
+ this.inputSplits = inputSplits;
+ }
+
+ @Override
+ public TestingInputSplit[] createInputSplits(int minNumSplits) {
+ return inputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS);
+ }
+
+ @Override
+ public InputSplitAssigner
getInputSplitAssigner(TestingInputSplit[] inputSplits) {
+ return new DefaultInputSplitAssigner(inputSplits);
+ }
+ }
+
+ private static final class TestingInputSplit implements InputSplit {
+
+ private static final long serialVersionUID =
-5404803705463116083L;
+ private final int splitNumber;
+
+ TestingInputSplit(int number) {
+ this.splitNumber = number;
+ }
+
+ public int getSplitNumber() {
+ return splitNumber;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TestingInputSplit that = (TestingInputSplit) o;
+ return splitNumber == that.splitNumber;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(splitNumber);
+ }
+ }
+
/**
* Tests the {@link
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
* call for a finished result partition.
@@ -708,9 +874,9 @@ public void testRequestPartitionState() throws Exception {
final CompletableFuture<TaskDeploymentDescriptor>
tddFuture = new CompletableFuture<>();
final TestingTaskExecutorGateway
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
- tddFuture.complete(taskDeploymentDescriptor);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- })
+
tddFuture.complete(taskDeploymentDescriptor);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
.createTestingTaskExecutorGateway();
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(),
testingTaskExecutorGateway);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 703cd0bf085..3b502db0d7d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -33,8 +33,6 @@
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -47,6 +45,7 @@
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.actor.RobustActorSystem;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
@@ -78,7 +77,7 @@
@BeforeClass
public static void setup() throws Exception {
- actorSystem = ActorSystem.create("TestingActorSystem");
+ actorSystem = RobustActorSystem.create("TestingActorSystem",
TestingUtils.getDefaultTestingActorSystemConfig());
testingServer = new TestingServer();
}
diff --git
a/flink-runtime/src/test/scala/akka/actor/RobustActorSystemTest.scala
b/flink-runtime/src/test/scala/akka/actor/RobustActorSystemTest.scala
new file mode 100644
index 00000000000..b8765375a8d
--- /dev/null
+++ b/flink-runtime/src/test/scala/akka/actor/RobustActorSystemTest.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.junit.{After, Before, Test}
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitSuite
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future, Promise}
+import scala.util.Success
+
+class RobustActorSystemTest extends JUnitSuite with Matchers {
+
+ var robustActorSystem: RobustActorSystem = null
+ var testingUncaughtExceptionHandler: TestingUncaughtExceptionHandler = null
+
+ @Before
+ def setup(): Unit = {
+ testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler
+ robustActorSystem = RobustActorSystem.create(
+ "testSystem",
+ AkkaUtils.testDispatcherConfig,
+ testingUncaughtExceptionHandler)
+ }
+
+ @After
+ def teardown(): Unit = {
+ robustActorSystem.terminate()
+ testingUncaughtExceptionHandler = null;
+ }
+
+ @Test
+ def testUncaughtExceptionHandler(): Unit = {
+ val error = new UnknownError("Foobar")
+
+ Future {
+ throw error
+ }(robustActorSystem.dispatcher)
+
+ val caughtException = Await.result(
+ testingUncaughtExceptionHandler.exceptionPromise.future,
+ Duration.Inf)
+
+ caughtException should equal (error)
+ }
+
+ @Test
+ def testUncaughtExceptionHandlerFromActor(): Unit = {
+ val error = new UnknownError()
+ val actor =
robustActorSystem.actorOf(Props.create(classOf[UncaughtExceptionActor], error))
+
+ actor ! Failure
+
+ val caughtException = Await.result(
+ testingUncaughtExceptionHandler.exceptionPromise.future,
+ Duration.Inf)
+
+ caughtException should equal (error)
+ }
+}
+
+class TestingUncaughtExceptionHandler extends UncaughtExceptionHandler {
+ val exceptionPromise: Promise[Throwable] = Promise()
+
+ override def uncaughtException(t: Thread, e: Throwable): Unit = {
+ exceptionPromise.complete(Success(e))
+ }
+}
+
+class UncaughtExceptionActor(failure: Throwable) extends Actor {
+ override def receive: Receive = {
+ case Failure => {
+ throw failure
+ };
+ }
+}
+
+case object Failure
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index a22a8a8c930..afd50881754 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -71,6 +71,7 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
+import akka.actor.RobustActorSystem;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
@@ -351,7 +352,7 @@ private void
testCheckpointedStreamingProgram(AbstractStateBackend stateBackend)
final int sequenceEnd = 5000;
final long expectedSum = Parallelism * sequenceEnd *
(sequenceEnd + 1) / 2;
- final ActorSystem system = ActorSystem.create("Test",
AkkaUtils.getDefaultAkkaConfig());
+ final ActorSystem system = RobustActorSystem.create("Test",
AkkaUtils.getDefaultAkkaConfig());
final TestingServer testingServer = new TestingServer();
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 00c6865300c..66a811a6379 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -31,6 +31,7 @@
import org.apache.flink.util.TestLogger;
import akka.actor.ActorSystem;
+import akka.actor.RobustActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.Test;
@@ -67,7 +68,7 @@
@Test
public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws
InterruptedException, TimeoutException {
- final ActorSystem system = ActorSystem.create("Testkit",
AkkaUtils.getDefaultAkkaConfig());
+ final ActorSystem system = RobustActorSystem.create("Testkit",
AkkaUtils.getDefaultAkkaConfig());
LocalFlinkMiniCluster miniCluster = null;
final int numTMs = 3;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services