[FLINK-6495][config] Revert heartbeat pause back to 60s This fixes an important bug introduced by FLINK-6495. Heartbeat pause MUST be significantly larger then heartbeat interval.
[hotfix][runtime] Fix exception message for restart delay Previously default value didn't match with an exception message that was being thrown. [hotfix][core] Add convienient IllegalConfigurationException constructor [FLINK-6495][config] Validate heartbeats configuration on runtime [hotfix][docs] Fix and improve akka config options documentation This closes #4774. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae50c30a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae50c30a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae50c30a Branch: refs/heads/master Commit: ae50c30ac3a7ce62df62120eda85b273a6aea7f7 Parents: 9a2ba6e Author: Piotr Nowojski <[email protected]> Authored: Mon Oct 2 19:33:46 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue Oct 31 00:04:59 2017 +0100 ---------------------------------------------------------------------- docs/ops/config.md | 4 +-- .../apache/flink/configuration/AkkaOptions.java | 2 +- .../IllegalConfigurationException.java | 11 ++++++++ .../restart/FixedDelayRestartStrategy.java | 2 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 28 +++++++++++++++++++- .../flink/runtime/akka/AkkaUtilsTest.scala | 22 ++++++++++++++- 6 files changed, 63 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index 7720e3a..b72bc10 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -328,9 +328,9 @@ The following parameters configure Flink's JobManager and TaskManagers. - `akka.framesize`: Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier (DEFAULT: **10485760b**). -- `akka.watch.heartbeat.interval`: Heartbeat interval for Akka's DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value. A thorough description of Akka's DeathWatch can be found [here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector) (DEFAULT: **10 s**). +- `akka.watch.heartbeat.interval`: Heartbeat interval for Akka's DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should decrease this value or increase `akka.watch.heartbeat.pause`. A thorough description of Akka's DeathWatch can be found [here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector) (DEFAULT: **10 s**). -- `akka.watch.heartbeat.pause`: Acceptable heartbeat pause for Akka's DeathWatch mechanism. A low value does not allow a irregular heartbeat. A thorough description of Akka's DeathWatch can be found [here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector) (DEFAULT: **60 s**). +- `akka.watch.heartbeat.pause`: Acceptable heartbeat pause for Akka's DeathWatch mechanism. A low value does not allow an irregular heartbeat. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value or decrease `akka.watch.heartbeat.interval`. Higher value increases the time to detect a dead TaskManager. A thorough description of Akka's DeathWatch can be found [here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector) (DEFAULT: **60 s**). - `akka.watch.threshold`: Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas a high value increases the time to detect a dead TaskManager. A thorough description of Akka's DeathWatch can be found [here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector) (DEFAULT: **12**). http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 9bfc237..1938923 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -47,7 +47,7 @@ public class AkkaOptions { */ public static final ConfigOption<String> WATCH_HEARTBEAT_PAUSE = ConfigOptions .key("akka.watch.heartbeat.pause") - .defaultValue(ASK_TIMEOUT.defaultValue()); + .defaultValue("60 s"); /** * The Akka tcp connection timeout. http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java index fcc77e0..fb1e5a8 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java @@ -42,6 +42,17 @@ public class IllegalConfigurationException extends RuntimeException { /** * Constructs an new IllegalConfigurationException with the given error message + * format and arguments. + * + * @param format The error message format for the exception. + * @param arguments The arguments for the format. + */ + public IllegalConfigurationException(String format, Object... arguments) { + super(String.format(format, arguments)); + } + + /** + * Constructs an new IllegalConfigurationException with the given error message * and a given cause. * * @param message The error message for the exception. http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java index a66db3b..ca9626a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -96,7 +96,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { } catch (NumberFormatException nfe) { if (delayString.equals(timeoutString)) { throw new Exception("Invalid config value for " + - AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + timeoutString + + AkkaOptions.WATCH_HEARTBEAT_INTERVAL.key() + ": " + timeoutString + ". Value must be a valid duration (such as '10 s' or '1 min')"); } else { throw new Exception("Invalid config value for " + http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 565b5ea..ab559a1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -26,7 +26,7 @@ import akka.actor._ import akka.pattern.{ask => akkaAsk} import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.api.common.time.Time -import org.apache.flink.configuration.{AkkaOptions, Configuration, SecurityOptions} +import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, SecurityOptions} import org.apache.flink.runtime.net.SSLUtils import org.apache.flink.util.NetUtils import org.jboss.netty.channel.ChannelException @@ -257,6 +257,20 @@ object AkkaUtils { ConfigFactory.parseString(config) } + private def validateHeartbeat(pauseParamName: String, + pauseValue: String, + intervalParamName: String, + intervalValue: String) = { + if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) { + throw new IllegalConfigurationException( + "%s [%s] must greater then %s [%s]", + pauseParamName, + pauseValue, + intervalParamName, + intervalValue) + } + } + /** * Creates a Akka config for a remote actor system listening on port on the network interface * identified by bindAddress. @@ -289,6 +303,12 @@ object AkkaUtils { val transportHeartbeatPause = configuration.getString( AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE) + validateHeartbeat( + AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(), + transportHeartbeatPause, + AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.key(), + transportHeartbeatInterval) + val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD) val watchHeartbeatInterval = configuration.getString( @@ -296,6 +316,12 @@ object AkkaUtils { val watchHeartbeatPause = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE) + validateHeartbeat( + AkkaOptions.WATCH_HEARTBEAT_PAUSE.key(), + watchHeartbeatPause, + AkkaOptions.WATCH_HEARTBEAT_INTERVAL.key(), + watchHeartbeatInterval) + val watchThreshold = configuration.getInteger(AkkaOptions.WATCH_THRESHOLD) val akkaTCPTimeout = configuration.getString(AkkaOptions.TCP_TIMEOUT) http://git-wip-us.apache.org/repos/asf/flink/blob/ae50c30a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index de8c26c..26257df 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka import java.net.InetSocketAddress -import org.apache.flink.configuration.Configuration +import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException} import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol @@ -35,6 +35,26 @@ class AkkaUtilsTest with Matchers with BeforeAndAfterAll { + test("getAkkaConfig should validate watch heartbeats") { + val configuration = new Configuration() + configuration.setString( + AkkaOptions.WATCH_HEARTBEAT_PAUSE.key(), + AkkaOptions.WATCH_HEARTBEAT_INTERVAL.defaultValue()) + intercept[IllegalConfigurationException] { + AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337))) + } + } + + test("getAkkaConfig should validate transport heartbeats") { + val configuration = new Configuration() + configuration.setString( + AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(), + AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.defaultValue()) + intercept[IllegalConfigurationException] { + AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337))) + } + } + test("getHostFromAkkaURL should return the correct host from a remote Akka URL") { val host = "127.0.0.1" val port = 1234
