Repository: flink Updated Branches: refs/heads/master c0d5ba388 -> 4f11b4016
[FLINK-2523] Make the task cancellation interval configurable This closes #1662 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f11b401 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f11b401 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f11b401 Branch: refs/heads/master Commit: 4f11b401688ac6ba78a1e32a556a2419affa2946 Parents: c0d5ba3 Author: kl0u <kklou...@gmail.com> Authored: Mon Mar 14 10:46:42 2016 +0100 Committer: zentol <ches...@apache.org> Committed: Mon Mar 14 14:42:05 2016 +0100 ---------------------------------------------------------------------- docs/apis/common/index.md | 2 ++ .../flink/api/common/ExecutionConfig.java | 32 ++++++++++++++++++-- .../flink/configuration/ConfigConstants.java | 5 +++ .../apache/flink/runtime/taskmanager/Task.java | 13 +++++--- .../flink/test/javaApiOperators/MapITCase.java | 2 ++ 5 files changed, 48 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/docs/apis/common/index.md ---------------------------------------------------------------------- diff --git a/docs/apis/common/index.md b/docs/apis/common/index.md index 6e1b27d..35a1832 100644 --- a/docs/apis/common/index.md +++ b/docs/apis/common/index.md @@ -999,6 +999,8 @@ Note that types registered with `registerKryoType()` are not available to Flink' - `disableAutoTypeRegistration()` Automatic type registration is enabled by default. The automatic type registration is registering all types (including sub-types) used by usercode with Kryo and the POJO serializer. +- `setTaskCancellationInterval(long interval)` Sets the the interval (in milliseconds) to wait between consecutive attempts to cancel a running task. When a task is canceled a new thread is created which periodically calls `interrupt()` on the task thread, if the task thread does not terminate within a certain time. This parameter refers to the time between consecutive calls to `interrupt()` and is set by default to **30000** milliseconds, or **30 seconds**. + The `RuntimeContext` which is accessible in `Rich*` functions through the `getRuntimeContext()` method also allows to access the `ExecutionConfig` in all user defined functions. {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 9b0ff52..205593c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -22,6 +22,8 @@ import com.esotericsoftware.kryo.Serializer; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigConstants; + import java.io.Serializable; import java.util.LinkedHashMap; @@ -107,6 +109,8 @@ public class ExecutionConfig implements Serializable { private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration; + private long taskCancellationIntervalMillis = ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS; + // Serializers and types registered with Kryo and the PojoSerializer // we store them in linked maps/sets to ensure they are registered in order in all kryo instances. @@ -214,6 +218,28 @@ public class ExecutionConfig implements Serializable { } /** + * Gets the interval (in milliseconds) between consecutive attempts to cancel a running task. + */ + public long getTaskCancellationInterval() { + return this.taskCancellationIntervalMillis; + } + + /** + * Sets the configuration parameter specifying the interval (in milliseconds) + * between consecutive attempts to cancel a running task. + * @param interval the interval (in milliseconds). + */ + public ExecutionConfig setTaskCancellationInterval(long interval) { + if(interval < 0) { + throw new IllegalArgumentException( + "The task cancellation interval cannot be negative." + ); + } + this.taskCancellationIntervalMillis = interval; + return this; + } + + /** * Sets the restart strategy to be used for recovery. * * <pre>{@code @@ -658,7 +684,8 @@ public class ExecutionConfig implements Serializable { registeredTypesWithKryoSerializerClasses.equals(other.registeredTypesWithKryoSerializerClasses) && defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) && registeredKryoTypes.equals(other.registeredKryoTypes) && - registeredPojoTypes.equals(other.registeredPojoTypes); + registeredPojoTypes.equals(other.registeredPojoTypes) && + taskCancellationIntervalMillis == other.taskCancellationIntervalMillis; } else { return false; @@ -683,7 +710,8 @@ public class ExecutionConfig implements Serializable { registeredTypesWithKryoSerializerClasses, defaultKryoSerializerClasses, registeredKryoTypes, - registeredPojoTypes); + registeredPojoTypes, + taskCancellationIntervalMillis); } public boolean canEqual(Object obj) { http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index f4e13f6..38b684c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -660,6 +660,11 @@ public final class ConfigConstants { */ public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false; + /** + * The default interval (in milliseconds) to wait between consecutive task cancellation attempts (= 30000 msec). + * */ + public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS = 30000; + // ------------------------ Runtime Algorithms ------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index d531e43..f3beae3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -836,7 +836,9 @@ public class Task implements Runnable { // because the canceling may block on user code, we cancel from a separate thread // we do not reuse the async call handler, because that one may be blocked, in which // case the canceling could not continue - Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask); + long taskCancellationInterval = this.executionConfig.getTaskCancellationInterval(); + Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask, + taskCancellationInterval); Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler, "Canceler for " + taskNameWithSubtask); cancelThread.setDaemon(true); @@ -1080,12 +1082,15 @@ public class Task implements Runnable { private final AbstractInvokable invokable; private final Thread executer; private final String taskName; + private final long taskCancellationIntervalMillis; - public TaskCanceler(Logger logger, AbstractInvokable invokable, Thread executer, String taskName) { + public TaskCanceler(Logger logger, AbstractInvokable invokable, + Thread executer, String taskName, long cancelationInterval) { this.logger = logger; this.invokable = invokable; this.executer = executer; this.taskName = taskName; + this.taskCancellationIntervalMillis = cancelationInterval; } @Override @@ -1103,7 +1108,7 @@ public class Task implements Runnable { // interrupt the running thread initially executer.interrupt(); try { - executer.join(30000); + executer.join(taskCancellationIntervalMillis); } catch (InterruptedException e) { // we can ignore this @@ -1126,7 +1131,7 @@ public class Task implements Runnable { executer.interrupt(); try { - executer.join(30000); + executer.join(taskCancellationIntervalMillis); } catch (InterruptedException e) { // we can ignore this http://git-wip-us.apache.org/repos/asf/flink/blob/4f11b401/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java index c1bd5e2..fb3e589 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java @@ -76,6 +76,7 @@ public class MapITCase extends MultipleProgramsTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setNumberOfExecutionRetries(1000); + env.getConfig().setTaskCancellationInterval(50000); DataSet<String> ds = CollectionDataSets.getStringDataSet(env); DataSet<String> identityMapDs = ds. @@ -83,6 +84,7 @@ public class MapITCase extends MultipleProgramsTestBase { @Override public String map(String value) throws Exception { Assert.assertTrue(1000 == getRuntimeContext().getExecutionConfig().getNumberOfExecutionRetries()); + Assert.assertTrue(50000 == getRuntimeContext().getExecutionConfig().getTaskCancellationInterval()); return value; } });