Repository: flink
Updated Branches:
  refs/heads/master b489c3673 -> 8d62033c2


[FLINK-2066][core] Add configuration of delay between execution retries at job 
level

This closes #1223


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a437a2b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a437a2b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a437a2b3

Branch: refs/heads/master
Commit: a437a2b396eb473db5e649c3879c34b67fffc943
Parents: b489c36
Author: wangchx <[email protected]>
Authored: Sat Oct 3 17:53:31 2015 -0700
Committer: Fabian Hueske <[email protected]>
Committed: Wed Oct 7 12:56:46 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |  2 ++
 .../flink/api/common/ExecutionConfig.java       | 25 +++++++++++++++++-
 .../java/org/apache/flink/api/common/Plan.java  |  7 +++++
 .../flink/api/java/ExecutionEnvironment.java    | 25 ++++++++++++++++++
 .../plantranslate/JobGraphGenerator.java        |  1 +
 .../apache/flink/runtime/jobgraph/JobGraph.java | 27 ++++++++++++++++++++
 .../flink/api/scala/ExecutionEnvironment.scala  | 18 ++++++++++++-
 .../environment/StreamExecutionEnvironment.java | 24 +++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java   | 15 +++++++++++
 .../api/scala/StreamExecutionEnvironment.scala  | 17 ++++++++++++
 10 files changed, 159 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 3959dc9..da141a9 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1992,6 +1992,8 @@ With the closure cleaner disabled, it might happen that 
an anonymous user functi
 
 - `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int 
numberOfExecutionRetries)` Sets the number of times that failed tasks are 
re-executed. A value of zero effectively disables fault tolerance. A value of 
`-1` indicates that the system default value (as defined in the configuration) 
should be used.
 
+- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long 
executionRetryDelay)` Sets the delay in milliseconds that the system waits 
after a job has failed, before re-executing it. The delay starts after all 
tasks have been successfully been stopped on the TaskManagers, and once the 
delay is past, the tasks are re-started. This parameter is useful to delay 
re-execution in order to let certain time-out related failures surface fully 
(like broken connections that have not fully timed out), before attempting a 
re-execution and immediately failing again due to the same problem. This 
parameter only has an effect if the number of execution re-tries is one or more.
+
 - `getExecutionMode()` / `setExecutionMode()`. The default execution mode is 
PIPELINED. Sets the execution mode to execute the program. The execution mode 
defines whether data exchanges are performed in a batch or on a pipelined 
manner. 
 
 - `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. 
Forces the GenericTypeInformation to use the Kryo serializer for POJOS even 
though we could analyze them as a POJO. In some cases this might be preferable. 
For example, when Flink's internal serializers fail to handle a POJO properly.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index df0248a..9ed3e92 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -34,6 +34,7 @@ import java.util.Objects;
  *     <li>The default parallelism of the program, i.e., how many parallel 
tasks to use for
  *         all functions that do not define a specific value directly.</li>
  *     <li>The number of retries in the case of failed executions.</li>
+ *     <li>The delay between delay between execution retries.</li>
  *     <li>The {@link ExecutionMode} of the program: Batch or Pipelined.
  *         The default execution mode is {@link ExecutionMode#PIPELINED}</li>
  *     <li>Enabling or disabling the "closure cleaner". The closure cleaner 
pre-processes
@@ -92,6 +93,8 @@ public class ExecutionConfig implements Serializable {
        private long autoWatermarkInterval = 0;
 
        private boolean timestampsEnabled = false;
+       
+       private long executionRetryDelay = -1;
 
        // Serializers and types registered with Kryo and the PojoSerializer
        // we store them in linked maps/sets to ensure they are registered in 
order in all kryo instances.
@@ -242,6 +245,13 @@ public class ExecutionConfig implements Serializable {
        public int getNumberOfExecutionRetries() {
                return numberOfExecutionRetries;
        }
+       
+       /**
+        * @return The delay between retires.
+        */
+       public long getExecutionRetryDelay() {
+               return executionRetryDelay;
+       }
 
        /**
         * Sets the number of times that failed tasks are re-executed. A value 
of zero
@@ -258,7 +268,20 @@ public class ExecutionConfig implements Serializable {
                this.numberOfExecutionRetries = numberOfExecutionRetries;
                return this;
        }
-
+       
+       /**
+        * Sets the delay between executions. A value of {@code -1} indicates 
that the default value 
+        * should be used.
+        * @param executionRetryDelay The number of milliseconds the system 
will wait to retry.
+        */
+       public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) 
{
+               if (executionRetryDelay < -1 ) {
+                       throw new IllegalArgumentException(
+                                       "The delay between reties must be 
non-negative, or -1 (use system default)");
+               }
+               this.executionRetryDelay = executionRetryDelay;
+               return this;
+       }
        /**
         * Sets the execution mode to execute the program. The execution mode 
defines whether
         * data exchanges are performed in a batch or on a pipelined manner.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java 
b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index e0d1eb8..dc8d152 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -303,6 +303,13 @@ public class Plan implements Visitable<Operator<?>> {
        }
        
        /**
+        * Gets the delay between retry failed task.
+        * @return The delay the system will wait to retry.
+        */
+       public long getExecutionRetryDelay() {
+               return getExecutionConfig().getExecutionRetryDelay();
+       }
+       /**
         * Gets the optimizer post-pass class for this job. The post-pass 
typically creates utility classes
         * for data types and is specific to a particular data model (record, 
tuple, Scala, ...)
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a596765..01fb15c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -199,6 +199,31 @@ public abstract class ExecutionEnvironment {
        }
        
        /**
+        * Sets the delay that failed tasks are re-executed in milliseconds. A 
value of
+        * zero effectively disables fault tolerance. A value of {@code -1}
+        * indicates that the system default value (as defined in the 
configuration)
+        * should be used.
+        *
+        * @param executionRetryDelay
+        *              The delay of time the system will wait to re-execute 
failed
+        *              tasks.
+        */
+       public void setExecutionRetryDelay(long executionRetryDelay) {
+               config.setExecutionRetryDelay(executionRetryDelay);
+       }
+       
+       /**
+        * Gets the delay time in milliseconds the system will wait to 
re-execute failed tasks.
+        * A value of {@code -1} indicates that the system default value (as 
defined
+        * in the configuration) should be used.
+        *
+        * @return The delay time the system will wait to re-execute failed 
tasks.
+        */
+       public long getExecutionRetryDelay() {
+               return config.getExecutionRetryDelay();
+       }
+       
+       /**
         * Returns the {@link org.apache.flink.api.common.JobExecutionResult} 
of the last executed job.
         * 
         * @return The execution result from the latest job execution.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index c15e47a..afd0682 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -219,6 +219,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                // create the job graph object
                JobGraph graph = new JobGraph(jobId, program.getJobName());
                
graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries());
+               
graph.setExecutionRetryDelay(program.getOriginalPlan().getExecutionRetryDelay());
                graph.setAllowQueuedScheduling(false);
                
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index e4a0209..4014a76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -79,6 +79,8 @@ public class JobGraph implements Serializable {
        
        /** The number of times that failed tasks should be re-executed */
        private int numExecutionRetries;
+       
+       private long executionRetryDelay;
 
        /** The number of seconds after which the corresponding ExecutionGraph 
is removed at the
         * job manager after it has been executed. */
@@ -211,6 +213,31 @@ public class JobGraph implements Serializable {
        public int getNumberOfExecutionRetries() {
                return numExecutionRetries;
        }
+       
+       /**
+        * Gets the delay of time the system will try to re-execute failed 
tasks. A value of
+        * {@code -1} indicates the system default value (as defined in the 
configuration)
+        * should be used.
+        * @return The delay of time in milliseconds the system will try to 
re-execute failed tasks.
+        */
+       public long getExecutionRetryDelay() {
+               return executionRetryDelay;
+       }
+       
+       /**
+        * Sets the delay that failed tasks are re-executed. A value of zero
+        * effectively disables fault tolerance. A value of {@code -1} 
indicates that the system
+        * default value (as defined in the configuration) should be used.
+        * 
+        * @param executionRetryDelay The delay of time the system will wait to 
re-execute failed tasks.
+        */
+       public void setExecutionRetryDelay(long executionRetryDelay){
+               if (executionRetryDelay < -1) {
+                       throw new IllegalArgumentException(
+                                       "The delay between reties must be 
non-negative, or -1 (use system default)");
+               }
+               this.executionRetryDelay = executionRetryDelay;
+       }
 
        /**
         * Gets the timeout after which the corresponding ExecutionGraph is 
removed at the

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 3427225..e27d55a 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -60,7 +60,7 @@ import scala.reflect.ClassTag
  *  - [[ExecutionEnvironment#createRemoteEnvironment]]
  *
  *  Use [[ExecutionEnvironment#getExecutionEnvironment]] to get the correct 
environment depending
- *  on where the program is executed. If it is run inside an IDE a loca 
environment will be
+ *  on where the program is executed. If it is run inside an IDE a local 
environment will be
  *  created. If the program is submitted to a cluster a remote execution 
environment will
  *  be created.
  */
@@ -110,6 +110,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
   /**
+   * Sets the delay that failed tasks are re-executed. A value of
+   * zero effectively disables fault tolerance. A value of "-1"
+   * indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
+    javaEnv.setExecutionRetryDelay(executionRetryDelay)
+  }
+
+  /**
+   * Gets the delay time in milliseconds the system will wait to re-execute 
failed tasks.
+   * A value of "-1" indicates that the system default value (as defined
+   * in the configuration) should be used.
+   */
+  def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
+  /**
    * Gets the UUID by which this environment is identified. The UUID sets the 
execution context
    * in the cluster or local environment.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 598d0df..c2e2880 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -414,6 +414,30 @@ public abstract class StreamExecutionEnvironment {
        }
 
        /**
+        * Sets the delay that failed tasks are re-executed. A value of
+        * zero effectively disables fault tolerance. A value of {@code -1}
+        * indicates that the system default value (as defined in the 
configuration)
+        * should be used.
+        *
+        * @param executionRetryDelay
+        *              The delay of time the system will wait to re-execute 
failed
+        *              tasks.
+        */
+       public void setExecutionRetryDelay(long executionRetryDelay){
+               config.setExecutionRetryDelay(executionRetryDelay);
+       }
+       
+       /**
+        * Gets the delay time in milliseconds the system will wait to 
re-execute failed tasks.
+        * A value of {@code -1} indicates that the system default value (as 
defined
+        * in the configuration) should be used.
+        *
+        * @return The delay time the system will wait to re-execute failed 
tasks.
+        */
+       public long getExecutionRetryDelay(){
+               return config.getExecutionRetryDelay();
+       }
+       /**
         * Sets the default parallelism that will be used for the local 
execution
         * environment created by {@link #createLocalEnvironment()}.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8eb91a2..d8e81cf 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -99,6 +99,8 @@ public class StreamingJobGraphGenerator {
                configureCheckpointing();
 
                configureExecutionRetries();
+               
+               configureExecutionRetryDelay();
 
                try {
                        
InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), 
this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY);
@@ -419,6 +421,10 @@ public class StreamingJobGraphGenerator {
                        if(executionRetries == -1) {
                                
streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
                        }
+                       long executionRetryDelay = 
streamGraph.getExecutionConfig().getExecutionRetryDelay();
+                       if(executionRetryDelay == -1) {
+                               
streamGraph.getExecutionConfig().setExecutionRetryDelay(100 * 1000);
+                       }
                }
        }
 
@@ -431,4 +437,13 @@ public class StreamingJobGraphGenerator {
                        jobGraph.setNumberOfExecutionRetries(0);
                }
        }
+       
+       private void configureExecutionRetryDelay() {
+               long executionRetryDelay = 
streamGraph.getExecutionConfig().getExecutionRetryDelay();
+               if (executionRetryDelay != -1) {
+                       jobGraph.setExecutionRetryDelay(executionRetryDelay);
+               } else {
+                       jobGraph.setExecutionRetryDelay(100 * 1000);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 2474d8c..7492e48 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -210,6 +210,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
+  /**
+   * Sets the delay that failed tasks are re-executed. A value of
+   * zero effectively disables fault tolerance. A value of "-1"
+   * indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
+    javaEnv.setExecutionRetryDelay(executionRetryDelay)
+  }
+
+  /**
+   * Gets the delay time in milliseconds the system will wait to re-execute 
failed tasks.
+   * A value of "-1" indicates that the system default value (as defined
+   * in the configuration) should be used.
+   */
+  def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
+
   // 
--------------------------------------------------------------------------------------------
   // Registry for types and serializers
   // 
--------------------------------------------------------------------------------------------

Reply via email to