Repository: flink
Updated Branches:
  refs/heads/master 0d3ff88b3 -> 3e3a90d89


[FLINK-5010] [akka] Introduce default configuration values for Akka's deathwatch

Set the akka deathwatch interval to 10s, the akka deathwatch pause to 60s and 
the tcp
connection timeout to 20s per default.

This closes #2831.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e3a90d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e3a90d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e3a90d8

Branch: refs/heads/master
Commit: 3e3a90d89cd62b07d3790ccea7bb1a30db9cc9d2
Parents: 0d3ff88
Author: Till Rohrmann <[email protected]>
Authored: Fri Nov 18 15:24:51 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 23 14:09:06 2016 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |  6 +--
 .../apache/flink/configuration/AkkaOptions.java | 51 ++++++++++++++++++++
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 14 ++----
 3 files changed, 58 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e3a90d8/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index cc1f6bc..51ef41c 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -255,9 +255,9 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 
 - `akka.framesize`: Maximum size of messages which are sent between the 
JobManager and the TaskManagers. If Flink fails because messages exceed this 
limit, then you should increase it. The message size requires a size-unit 
specifier (DEFAULT: **10485760b**).
 
-- `akka.watch.heartbeat.interval`: Heartbeat interval for Akka's DeathWatch 
mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead 
because of lost or delayed heartbeat messages, then you should increase this 
value. A thorough description of Akka's DeathWatch can be found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **akka.ask.timeout/10**).
+- `akka.watch.heartbeat.interval`: Heartbeat interval for Akka's DeathWatch 
mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead 
because of lost or delayed heartbeat messages, then you should increase this 
value. A thorough description of Akka's DeathWatch can be found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **10 s**).
 
-- `akka.watch.heartbeat.pause`: Acceptable heartbeat pause for Akka's 
DeathWatch mechanism. A low value does not allow a irregular heartbeat. A 
thorough description of Akka's DeathWatch can be found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **akka.ask.timeout**).
+- `akka.watch.heartbeat.pause`: Acceptable heartbeat pause for Akka's 
DeathWatch mechanism. A low value does not allow a irregular heartbeat. A 
thorough description of Akka's DeathWatch can be found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **60 s**).
 
 - `akka.watch.threshold`: Threshold for the DeathWatch failure detector. A low 
value is prone to false positives whereas a high value increases the time to 
detect a dead TaskManager. A thorough description of Akka's DeathWatch can be 
found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **12**).
 
@@ -267,7 +267,7 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 
 - `akka.transport.threshold`: Threshold for the transport failure detector. 
Since Flink uses TCP, the detector is not necessary and, thus, the threshold is 
set to a high value (DEFAULT: **300**).
 
-- `akka.tcp.timeout`: Timeout for all outbound connections. If you should 
experience problems with connecting to a TaskManager due to a slow network, you 
should increase this value (DEFAULT: **akka.ask.timeout**).
+- `akka.tcp.timeout`: Timeout for all outbound connections. If you should 
experience problems with connecting to a TaskManager due to a slow network, you 
should increase this value (DEFAULT: **20 s**).
 
 - `akka.throughput`: Number of messages that are processed in a batch before 
returning the thread to the pool. Low values denote a fair scheduling whereas 
high values can increase the performance at the cost of unfairness (DEFAULT: 
**15**).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3a90d8/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
new file mode 100644
index 0000000..7e4c2b7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Akka configuration options.
+ *
+ * TODO: Migrate other akka config options to this file
+ */
+@PublicEvolving
+public class AkkaOptions {
+
+       /**
+        * The Akka tcp connection timeout.
+        */
+       public static final ConfigOption<String> AKKA_TCP_TIMEOUT = 
ConfigOptions
+               .key("akka.tcp.timeout")
+               .defaultValue("20 s");
+
+       /**
+        * The Akka death watch heartbeat interval.
+        */
+       public static final ConfigOption<String> AKKA_WATCH_HEARTBEAT_INTERVAL 
= ConfigOptions
+               .key("akka.watch.heartbeat.interval")
+               .defaultValue("10 s");
+
+       /**
+        * The maximum acceptable Akka death watch heartbeat pause.
+        */
+       public static final ConfigOption<String> AKKA_WATCH_HEARTBEAT_PAUSE = 
ConfigOptions
+               .key("akka.watch.heartbeat.pause")
+               .defaultValue("60 s");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3a90d8/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 27100fd..6a23c39 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -26,7 +26,7 @@ import akka.actor._
 import akka.pattern.{ask => akkaAsk}
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, 
ConfigValueFactory}
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration}
 import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
@@ -242,21 +242,15 @@ object AkkaUtils {
       ConfigConstants.AKKA_TRANSPORT_THRESHOLD,
       ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD)
 
-    val watchHeartbeatInterval = configuration.getString(
-      ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL,
-      (akkaAskTimeout).toString)
+    val watchHeartbeatInterval = 
configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL);
 
-    val watchHeartbeatPause = configuration.getString(
-      ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
-      (akkaAskTimeout * 10).toString)
+    val watchHeartbeatPause = 
configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_PAUSE);
 
     val watchThreshold = configuration.getDouble(
       ConfigConstants.AKKA_WATCH_THRESHOLD,
       ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD)
 
-    val akkaTCPTimeout = configuration.getString(
-      ConfigConstants.AKKA_TCP_TIMEOUT,
-      (akkaAskTimeout * 10).toString)
+    val akkaTCPTimeout = configuration.getString(AkkaOptions.AKKA_TCP_TIMEOUT);
 
     val akkaFramesize = configuration.getString(
       ConfigConstants.AKKA_FRAMESIZE,

Reply via email to