[FLINK-3361] [jobmanager] Fix error messages about execution delay and max heartbeat pause
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af3e6890 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af3e6890 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af3e6890 Branch: refs/heads/master Commit: af3e689010f3d8e08b6ff70dd5eb6d45429d4981 Parents: 4cc4f60 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 8 11:46:35 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 8 16:57:57 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 28 +++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/af3e6890/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4410ec3..bc7c134 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -25,8 +25,10 @@ import java.util.UUID import akka.actor.Status.Failure import akka.actor._ import akka.pattern.ask + import grizzled.slf4j.Logger -import org.apache.flink.api.common.{ApplicationID, ExecutionConfig, JobID} + +import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.accumulators.AccumulatorSnapshot @@ -61,6 +63,7 @@ import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils} + import org.jboss.netty.channel.ChannelException import scala.annotation.tailrec @@ -1712,9 +1715,9 @@ object JobManager { maxSleepBetweenRetries : Long = 0 ) : scala.util.Try[T] = { - def sleepBeforeRetry : Unit = { + def sleepBeforeRetry() : Unit = { if (maxSleepBetweenRetries > 0) { - val sleepTime = ((Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]) + val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long] LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} ms.") Thread.sleep(sleepTime) } @@ -1728,7 +1731,7 @@ object JobManager { scala.util.Failure(new RuntimeException( "Unable to do further retries starting the actor system")) } else { - sleepBeforeRetry + sleepBeforeRetry() retryOnBindException(fn, stopCond) } case scala.util.Failure(x: Exception) => x.getCause match { @@ -1737,7 +1740,7 @@ object JobManager { scala.util.Failure(new RuntimeException( "Unable to do further retries starting the actor system")) } else { - sleepBeforeRetry + sleepBeforeRetry() retryOnBindException(fn, stopCond) } case _ => scala.util.Failure(x) @@ -2044,14 +2047,21 @@ object JobManager { ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) val delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, pauseString) - + val delayBetweenRetries: Long = try { Duration(delayString).toMillis } catch { - case n: NumberFormatException => throw new Exception( - s"Invalid config value for ${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " + - s"$pauseString. Value must be a valid duration (such as 100 milli or 1 min)"); + case n: NumberFormatException => + if (delayString.equals(pauseString)) { + throw new Exception( + s"Invalid config value for ${ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE}: " + + s"$pauseString. Value must be a valid duration (such as '10 s' or '1 min')") + } else { + throw new Exception( + s"Invalid config value for ${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " + + s"$delayString. Value must be a valid duration (such as '100 milli' or '10 s')") + } } val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())