Repository: flink
Updated Branches:
  refs/heads/master c0d5ba388 -> 4f11b4016


[FLINK-2523] Make the task cancellation interval configurable

This closes #1662


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f11b401
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f11b401
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f11b401

Branch: refs/heads/master
Commit: 4f11b401688ac6ba78a1e32a556a2419affa2946
Parents: c0d5ba3
Author: kl0u <kklou...@gmail.com>
Authored: Mon Mar 14 10:46:42 2016 +0100
Committer: zentol <ches...@apache.org>
Committed: Mon Mar 14 14:42:05 2016 +0100

----------------------------------------------------------------------
 docs/apis/common/index.md                       |  2 ++
 .../flink/api/common/ExecutionConfig.java       | 32 ++++++++++++++++++--
 .../flink/configuration/ConfigConstants.java    |  5 +++
 .../apache/flink/runtime/taskmanager/Task.java  | 13 +++++---
 .../flink/test/javaApiOperators/MapITCase.java  |  2 ++
 5 files changed, 48 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/docs/apis/common/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/common/index.md b/docs/apis/common/index.md
index 6e1b27d..35a1832 100644
--- a/docs/apis/common/index.md
+++ b/docs/apis/common/index.md
@@ -999,6 +999,8 @@ Note that types registered with `registerKryoType()` are 
not available to Flink'
 
 - `disableAutoTypeRegistration()` Automatic type registration is enabled by 
default. The automatic type registration is registering all types (including 
sub-types) used by usercode with Kryo and the POJO serializer.
 
+- `setTaskCancellationInterval(long interval)` Sets the the interval (in 
milliseconds) to wait between consecutive attempts to cancel a running task. 
When a task is canceled a new thread is created which periodically calls 
`interrupt()` on the task thread, if the task thread does not terminate within 
a certain time. This parameter refers to the time between consecutive calls to 
`interrupt()` and is set by default to **30000** milliseconds, or **30 
seconds**.
+
 The `RuntimeContext` which is accessible in `Rich*` functions through the 
`getRuntimeContext()` method also allows to access the `ExecutionConfig` in all 
user defined functions.
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 9b0ff52..205593c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,8 @@ import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+
 
 import java.io.Serializable;
 import java.util.LinkedHashMap;
@@ -107,6 +109,8 @@ public class ExecutionConfig implements Serializable {
 
        private RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration;
        
+       private long taskCancellationIntervalMillis = 
ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS;
+
        // Serializers and types registered with Kryo and the PojoSerializer
        // we store them in linked maps/sets to ensure they are registered in 
order in all kryo instances.
 
@@ -214,6 +218,28 @@ public class ExecutionConfig implements Serializable {
        }
 
        /**
+        * Gets the interval (in milliseconds) between consecutive attempts to 
cancel a running task.
+        */
+       public long getTaskCancellationInterval() {
+               return this.taskCancellationIntervalMillis;
+       }
+
+       /**
+        * Sets the configuration parameter specifying the interval (in 
milliseconds)
+        * between consecutive attempts to cancel a running task.
+        * @param interval the interval (in milliseconds).
+        */
+       public ExecutionConfig setTaskCancellationInterval(long interval) {
+               if(interval < 0) {
+                       throw new IllegalArgumentException(
+                               "The task cancellation interval cannot be 
negative."
+                       );
+               }
+               this.taskCancellationIntervalMillis = interval;
+               return this;
+       }
+
+       /**
         * Sets the restart strategy to be used for recovery.
         *
         * <pre>{@code
@@ -658,7 +684,8 @@ public class ExecutionConfig implements Serializable {
                                
registeredTypesWithKryoSerializerClasses.equals(other.registeredTypesWithKryoSerializerClasses)
 &&
                                
defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) &&
                                
registeredKryoTypes.equals(other.registeredKryoTypes) &&
-                               
registeredPojoTypes.equals(other.registeredPojoTypes);
+                               
registeredPojoTypes.equals(other.registeredPojoTypes) &&
+                               taskCancellationIntervalMillis == 
other.taskCancellationIntervalMillis;
 
                } else {
                        return false;
@@ -683,7 +710,8 @@ public class ExecutionConfig implements Serializable {
                        registeredTypesWithKryoSerializerClasses,
                        defaultKryoSerializerClasses,
                        registeredKryoTypes,
-                       registeredPojoTypes);
+                       registeredPojoTypes,
+                       taskCancellationIntervalMillis);
        }
 
        public boolean canEqual(Object obj) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f4e13f6..38b684c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -660,6 +660,11 @@ public final class ConfigConstants {
         */
        public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = 
false;
 
+       /**
+        * The default interval (in milliseconds) to wait between consecutive 
task cancellation attempts (= 30000 msec).
+        * */
+       public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS = 
30000;
+
        // ------------------------ Runtime Algorithms ------------------------
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d531e43..f3beae3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -836,7 +836,9 @@ public class Task implements Runnable {
                                                // because the canceling may 
block on user code, we cancel from a separate thread
                                                // we do not reuse the async 
call handler, because that one may be blocked, in which
                                                // case the canceling could not 
continue
-                                               Runnable canceler = new 
TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask);
+                                               long taskCancellationInterval = 
this.executionConfig.getTaskCancellationInterval();
+                                               Runnable canceler = new 
TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask,
+                                                       
taskCancellationInterval);
                                                Thread cancelThread = new 
Thread(executingThread.getThreadGroup(), canceler,
                                                                "Canceler for " 
+ taskNameWithSubtask);
                                                cancelThread.setDaemon(true);
@@ -1080,12 +1082,15 @@ public class Task implements Runnable {
                private final AbstractInvokable invokable;
                private final Thread executer;
                private final String taskName;
+               private final long taskCancellationIntervalMillis;
 
-               public TaskCanceler(Logger logger, AbstractInvokable invokable, 
Thread executer, String taskName) {
+               public TaskCanceler(Logger logger, AbstractInvokable invokable,
+                                                       Thread executer, String 
taskName, long cancelationInterval) {
                        this.logger = logger;
                        this.invokable = invokable;
                        this.executer = executer;
                        this.taskName = taskName;
+                       this.taskCancellationIntervalMillis = 
cancelationInterval;
                }
 
                @Override
@@ -1103,7 +1108,7 @@ public class Task implements Runnable {
                                // interrupt the running thread initially
                                executer.interrupt();
                                try {
-                                       executer.join(30000);
+                                       
executer.join(taskCancellationIntervalMillis);
                                }
                                catch (InterruptedException e) {
                                        // we can ignore this
@@ -1126,7 +1131,7 @@ public class Task implements Runnable {
 
                                        executer.interrupt();
                                        try {
-                                               executer.join(30000);
+                                               
executer.join(taskCancellationIntervalMillis);
                                        }
                                        catch (InterruptedException e) {
                                                // we can ignore this

http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index c1bd5e2..fb3e589 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -76,6 +76,7 @@ public class MapITCase extends MultipleProgramsTestBase {
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().setNumberOfExecutionRetries(1000);
+               env.getConfig().setTaskCancellationInterval(50000);
 
                DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
                DataSet<String> identityMapDs = ds.
@@ -83,6 +84,7 @@ public class MapITCase extends MultipleProgramsTestBase {
                                @Override
                                public String map(String value) throws 
Exception {
                                        Assert.assertTrue(1000 == 
getRuntimeContext().getExecutionConfig().getNumberOfExecutionRetries());
+                                       Assert.assertTrue(50000 == 
getRuntimeContext().getExecutionConfig().getTaskCancellationInterval());
                                        return value;
                                }
                        });

Reply via email to