[FLINK-6495][config] Revert heartbeat pause back to 60s

This fixes an important bug introduced by FLINK-6495. Heartbeat pause MUST be
significantly larger then heartbeat interval.

[hotfix][runtime] Fix exception message for restart delay

Previously default value didn't match with an exception message that was being 
thrown.

[hotfix][core] Add convienient IllegalConfigurationException constructor

[FLINK-6495][config] Validate heartbeats configuration on runtime

[hotfix][docs] Fix and improve akka config options documentation

This closes #4774.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae50c30a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae50c30a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae50c30a

Branch: refs/heads/master
Commit: ae50c30ac3a7ce62df62120eda85b273a6aea7f7
Parents: 9a2ba6e
Author: Piotr Nowojski <[email protected]>
Authored: Mon Oct 2 19:33:46 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Oct 31 00:04:59 2017 +0100

----------------------------------------------------------------------
 docs/ops/config.md                              |  4 +--
 .../apache/flink/configuration/AkkaOptions.java |  2 +-
 .../IllegalConfigurationException.java          | 11 ++++++++
 .../restart/FixedDelayRestartStrategy.java      |  2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 28 +++++++++++++++++++-
 .../flink/runtime/akka/AkkaUtilsTest.scala      | 22 ++++++++++++++-
 6 files changed, 63 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 7720e3a..b72bc10 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -328,9 +328,9 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 
 - `akka.framesize`: Maximum size of messages which are sent between the 
JobManager and the TaskManagers. If Flink fails because messages exceed this 
limit, then you should increase it. The message size requires a size-unit 
specifier (DEFAULT: **10485760b**).
 
-- `akka.watch.heartbeat.interval`: Heartbeat interval for Akka's DeathWatch 
mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead 
because of lost or delayed heartbeat messages, then you should increase this 
value. A thorough description of Akka's DeathWatch can be found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **10 s**).
+- `akka.watch.heartbeat.interval`: Heartbeat interval for Akka's DeathWatch 
mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead 
because of lost or delayed heartbeat messages, then you should decrease this 
value or increase `akka.watch.heartbeat.pause`. A thorough description of 
Akka's DeathWatch can be found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **10 s**).
 
-- `akka.watch.heartbeat.pause`: Acceptable heartbeat pause for Akka's 
DeathWatch mechanism. A low value does not allow a irregular heartbeat. A 
thorough description of Akka's DeathWatch can be found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **60 s**).
+- `akka.watch.heartbeat.pause`: Acceptable heartbeat pause for Akka's 
DeathWatch mechanism. A low value does not allow an irregular heartbeat. If 
TaskManagers are wrongly marked dead because of lost or delayed heartbeat 
messages, then you should increase this value or decrease 
`akka.watch.heartbeat.interval`. Higher value increases the time to detect a 
dead TaskManager. A thorough description of Akka's DeathWatch can be found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **60 s**).
 
 - `akka.watch.threshold`: Threshold for the DeathWatch failure detector. A low 
value is prone to false positives whereas a high value increases the time to 
detect a dead TaskManager. A thorough description of Akka's DeathWatch can be 
found 
[here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector)
 (DEFAULT: **12**).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 9bfc237..1938923 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -47,7 +47,7 @@ public class AkkaOptions {
         */
        public static final ConfigOption<String> WATCH_HEARTBEAT_PAUSE = 
ConfigOptions
                .key("akka.watch.heartbeat.pause")
-               .defaultValue(ASK_TIMEOUT.defaultValue());
+               .defaultValue("60 s");
 
        /**
         * The Akka tcp connection timeout.

http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
index fcc77e0..fb1e5a8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
@@ -42,6 +42,17 @@ public class IllegalConfigurationException extends 
RuntimeException {
 
        /**
         * Constructs an new IllegalConfigurationException with the given error 
message
+        * format and arguments.
+        *
+        * @param format The error message format for the exception.
+        * @param arguments The arguments for the format.
+        */
+       public IllegalConfigurationException(String format, Object... 
arguments) {
+               super(String.format(format, arguments));
+       }
+
+       /**
+        * Constructs an new IllegalConfigurationException with the given error 
message
         * and a given cause.
         *
         * @param message The error message for the exception.

http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index a66db3b..ca9626a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -96,7 +96,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                } catch (NumberFormatException nfe) {
                        if (delayString.equals(timeoutString)) {
                                throw new Exception("Invalid config value for " 
+
-                                               
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + timeoutString +
+                                               
AkkaOptions.WATCH_HEARTBEAT_INTERVAL.key() + ": " + timeoutString +
                                                ". Value must be a valid 
duration (such as '10 s' or '1 min')");
                        } else {
                                throw new Exception("Invalid config value for " 
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 565b5ea..ab559a1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -26,7 +26,7 @@ import akka.actor._
 import akka.pattern.{ask => akkaAsk}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{AkkaOptions, Configuration, 
SecurityOptions}
+import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException, SecurityOptions}
 import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
 import org.jboss.netty.channel.ChannelException
@@ -257,6 +257,20 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
+  private def validateHeartbeat(pauseParamName: String,
+                                pauseValue: String,
+                                intervalParamName: String,
+                                intervalValue: String) = {
+    if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) {
+      throw new IllegalConfigurationException(
+        "%s [%s] must greater then %s [%s]",
+        pauseParamName,
+        pauseValue,
+        intervalParamName,
+        intervalValue)
+    }
+  }
+
   /**
    * Creates a Akka config for a remote actor system listening on port on the 
network interface
    * identified by bindAddress.
@@ -289,6 +303,12 @@ object AkkaUtils {
     val transportHeartbeatPause = configuration.getString(
       AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE)
 
+    validateHeartbeat(
+      AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(),
+      transportHeartbeatPause,
+      AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.key(),
+      transportHeartbeatInterval)
+
     val transportThreshold = 
configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD)
 
     val watchHeartbeatInterval = configuration.getString(
@@ -296,6 +316,12 @@ object AkkaUtils {
 
     val watchHeartbeatPause = 
configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE)
 
+    validateHeartbeat(
+      AkkaOptions.WATCH_HEARTBEAT_PAUSE.key(),
+      watchHeartbeatPause,
+      AkkaOptions.WATCH_HEARTBEAT_INTERVAL.key(),
+      watchHeartbeatInterval)
+
     val watchThreshold = configuration.getInteger(AkkaOptions.WATCH_THRESHOLD)
 
     val akkaTCPTimeout = configuration.getString(AkkaOptions.TCP_TIMEOUT)

http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index de8c26c..26257df 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka
 
 import java.net.InetSocketAddress
 
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException}
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
@@ -35,6 +35,26 @@ class AkkaUtilsTest
   with Matchers
   with BeforeAndAfterAll {
 
+  test("getAkkaConfig should validate watch heartbeats") {
+    val configuration = new Configuration()
+    configuration.setString(
+      AkkaOptions.WATCH_HEARTBEAT_PAUSE.key(),
+      AkkaOptions.WATCH_HEARTBEAT_INTERVAL.defaultValue())
+    intercept[IllegalConfigurationException] {
+      AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337)))
+    }
+  }
+
+  test("getAkkaConfig should validate transport heartbeats") {
+    val configuration = new Configuration()
+    configuration.setString(
+      AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(),
+      AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.defaultValue())
+    intercept[IllegalConfigurationException] {
+      AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337)))
+    }
+  }
+
   test("getHostFromAkkaURL should return the correct host from a remote Akka 
URL") {
     val host = "127.0.0.1"
     val port = 1234

Reply via email to