[FLINK-3361] [jobmanager] Fix error messages about execution delay and max 
heartbeat pause


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af3e6890
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af3e6890
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af3e6890

Branch: refs/heads/master
Commit: af3e689010f3d8e08b6ff70dd5eb6d45429d4981
Parents: 4cc4f60
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 8 11:46:35 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 8 16:57:57 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 28 +++++++++++++-------
 1 file changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af3e6890/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4410ec3..bc7c134 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,8 +25,10 @@ import java.util.UUID
 import akka.actor.Status.Failure
 import akka.actor._
 import akka.pattern.ask
+
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.{ApplicationID, ExecutionConfig, JobID}
+
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
@@ -61,6 +63,7 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
+
 import org.jboss.netty.channel.ChannelException
 
 import scala.annotation.tailrec
@@ -1712,9 +1715,9 @@ object JobManager {
       maxSleepBetweenRetries : Long = 0 )
     : scala.util.Try[T] = {
 
-    def sleepBeforeRetry : Unit = {
+    def sleepBeforeRetry() : Unit = {
       if (maxSleepBetweenRetries > 0) {
-        val sleepTime = ((Math.random() * 
maxSleepBetweenRetries).asInstanceOf[Long])
+        val sleepTime = (Math.random() * 
maxSleepBetweenRetries).asInstanceOf[Long]
         LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} 
ms.")
         Thread.sleep(sleepTime)
       }
@@ -1728,7 +1731,7 @@ object JobManager {
           scala.util.Failure(new RuntimeException(
             "Unable to do further retries starting the actor system"))
         } else {
-          sleepBeforeRetry
+          sleepBeforeRetry()
           retryOnBindException(fn, stopCond)
         }
       case scala.util.Failure(x: Exception) => x.getCause match {
@@ -1737,7 +1740,7 @@ object JobManager {
             scala.util.Failure(new RuntimeException(
               "Unable to do further retries starting the actor system"))
           } else {
-            sleepBeforeRetry
+            sleepBeforeRetry()
             retryOnBindException(fn, stopCond)
           }
         case _ => scala.util.Failure(x)
@@ -2044,14 +2047,21 @@ object JobManager {
                                               
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
     val delayString = 
configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY,
                                               pauseString)
-
+    
     val delayBetweenRetries: Long = try {
         Duration(delayString).toMillis
       }
       catch {
-        case n: NumberFormatException => throw new Exception(
-          s"Invalid config value for 
${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " +
-            s"$pauseString. Value must be a valid duration (such as 100 milli 
or 1 min)");
+        case n: NumberFormatException =>
+          if (delayString.equals(pauseString)) {
+            throw new Exception(
+              s"Invalid config value for 
${ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE}: " +
+                s"$pauseString. Value must be a valid duration (such as '10 s' 
or '1 min')")
+          } else {
+            throw new Exception(
+              s"Invalid config value for 
${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " +
+                s"$delayString. Value must be a valid duration (such as '100 
milli' or '10 s')")
+          }
       }
 
     val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())

Reply via email to