Repository: flink
Updated Branches:
  refs/heads/release-1.1 529534f33 -> cc6655b7b


[FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck

- Splits the cancellation up into two threads:
  * The `TaskCanceler` calls `cancel` on the invokable and `interrupt`
    on the executing Thread. It then exists.
  * The `TaskCancellationWatchDog` kicks in after the task cancellation
    timeout (current default: 30 secs) and periodically calls `interrupt`
    on the executing Thread. If the Thread does not terminate within
    the task cancellation timeout (new config value, default 3 mins), the task
    manager is notified about a fatal error, leading to termination of the JVM.
- The new configuration is exposed via 
`ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
(default: 3 mins) and the `ExecutionConfig` (similar to the cancellation 
interval).

Backported with slight adjustments from the master branch.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc6655b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc6655b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc6655b7

Branch: refs/heads/release-1.1
Commit: cc6655b7b4518a105971c7965d313a7aefcfc613
Parents: 529534f
Author: Ufuk Celebi <[email protected]>
Authored: Tue Oct 18 09:50:36 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Oct 27 17:41:44 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |  46 +++-
 .../flink/configuration/ConfigConstants.java    |  14 +
 .../apache/flink/runtime/taskmanager/Task.java  | 157 +++++++++--
 ...TaskManagerProcessReapingFatalErrorTest.java |  40 +++
 .../TaskManagerProcessReapingTest.java          | 241 +---------------
 .../TaskManagerProcessReapingTestBase.java      | 275 +++++++++++++++++++
 .../flink/runtime/taskmanager/TaskTest.java     | 247 ++++++++++++++++-
 .../flink/core/testutils/OneShotLatch.java      |   9 +
 8 files changed, 748 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 5b69794..cbcafed 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -19,11 +19,9 @@
 package org.apache.flink.api.common;
 
 import com.esotericsoftware.kryo.Serializer;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.util.Preconditions;
-
 
 import java.io.Serializable;
 import java.util.Collections;
@@ -32,6 +30,8 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A config to define the behavior of the program execution. It allows to 
define (among other
  * options) the following settings:
@@ -116,6 +116,12 @@ public class ExecutionConfig implements Serializable {
        
        private long taskCancellationIntervalMillis = -1;
 
+       /**
+        * Timeout after which an ongoing task cancellation will lead to a fatal
+        * TaskManager error, usually killing the JVM.
+        */
+       private long taskCancellationTimeoutMillis = -1;
+
        // ------------------------------- User code values 
--------------------------------------------
 
        private GlobalJobParameters globalJobParameters;
@@ -219,7 +225,7 @@ public class ExecutionConfig implements Serializable {
         * @param parallelism The parallelism to use
         */
        public ExecutionConfig setParallelism(int parallelism) {
-               Preconditions.checkArgument(parallelism > 0 || parallelism == 
PARALLELISM_DEFAULT,
+               checkArgument(parallelism > 0 || parallelism == 
PARALLELISM_DEFAULT,
                        "The parallelism of an operator must be at least 1.");
 
                this.parallelism = parallelism;
@@ -245,6 +251,38 @@ public class ExecutionConfig implements Serializable {
        }
 
        /**
+        * Returns the timeout (in milliseconds) after which an ongoing task
+        * cancellation leads to a fatal TaskManager error.
+        *
+        * <p>The value <code>0</code> disables the timeout. In this case a 
stuck
+        * cancellation will not lead to a fatal error.
+        */
+       @PublicEvolving
+       public long getTaskCancellationTimeout() {
+               return this.taskCancellationTimeoutMillis;
+       }
+
+       /**
+        * Sets the timeout (in milliseconds) after which an ongoing task 
cancellation
+        * is considered failed, leading to a fatal TaskManager error.
+        *
+        * <p>By default, this is deactivated.
+        *
+        * <p>The cluster default is configured via {@link 
org.apache.flink.configuration.ConfigConstants#TASK_CANCELLATION_TIMEOUT_MILLIS}.
+        *
+        * <p>The value <code>0</code> disables the timeout. In this case a 
stuck
+        * cancellation will not lead to a fatal error.
+        *
+        * @param timeout The task cancellation timeout (in milliseconds).
+        */
+       @PublicEvolving
+       public ExecutionConfig setTaskCancellationTimeout(long timeout) {
+               checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
+               this.taskCancellationTimeoutMillis = timeout;
+               return this;
+       }
+
+       /**
         * Sets the restart strategy to be used for recovery.
         *
         * <pre>{@code

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9902350..d1ad1c4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -270,6 +270,13 @@ public final class ConfigConstants {
        @PublicEvolving
        public static final String TASK_CANCELLATION_INTERVAL_MILLIS = 
"task.cancellation-interval";
 
+       /**
+        * Timeout in milliseconds after which a task cancellation times out and
+        * leads to a fatal TaskManager error.
+        */
+       @PublicEvolving
+       public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = 
"task.cancellation.timeout";
+
        // --------------------------- Runtime Algorithms 
-------------------------------
        
        /**
@@ -859,6 +866,13 @@ public final class ConfigConstants {
         * */
        public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS = 
30000;
 
+       /**
+        * Default timeout in milliseconds after which a task cancellation 
times out
+        * and leads to a fatal TaskManager error. This has been backported 
from 1.2 and
+        * deactivated by default.
+        */
+       public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; 
// deactivated
+
        // ------------------------ Runtime Algorithms ------------------------
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 25a7e29..4b47cba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
@@ -25,7 +26,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -52,18 +52,18 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages.FailTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -76,6 +76,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -229,6 +230,9 @@ public class Task implements Runnable {
        /** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
        private long taskCancellationInterval;
 
+       /** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
+       private long taskCancellationTimeout;
+
        /**
         * <p><b>IMPORTANT:</b> This constructor may not start any work that 
would need to
         * be undone in the case of a failing task deployment.</p>
@@ -259,9 +263,14 @@ public class Task implements Runnable {
                this.operatorState = tdd.getOperatorState();
                this.serializedExecutionConfig = 
checkNotNull(tdd.getSerializedExecutionConfig());
 
-               this.taskCancellationInterval = jobConfiguration.getLong(
-                       ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
-                       
ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
+               Configuration taskConfig = tdd.getTaskConfiguration();
+               this.taskCancellationInterval = taskConfig.getLong(
+                               
ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
+                               
ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
+
+               this.taskCancellationTimeout = taskConfig.getLong(
+                               
ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS,
+                               
ConfigConstants.DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS);
 
                this.memoryManager = checkNotNull(memManager);
                this.ioManager = checkNotNull(ioManager);
@@ -381,6 +390,16 @@ public class Task implements Runnable {
                return executingThread;
        }
 
+       @VisibleForTesting
+       long getTaskCancellationInterval() {
+               return taskCancellationInterval;
+       }
+
+       @VisibleForTesting
+       long getTaskCancellationTimeout() {
+               return taskCancellationTimeout;
+       }
+
        // 
------------------------------------------------------------------------
        //  Task Execution
        // 
------------------------------------------------------------------------
@@ -478,6 +497,11 @@ public class Task implements Runnable {
                                taskCancellationInterval = 
executionConfig.getTaskCancellationInterval();
                        }
 
+                       if (executionConfig.getTaskCancellationTimeout() >= 0) {
+                               // override task cancellation timeout from 
Flink config if set in ExecutionConfig
+                               taskCancellationTimeout = 
executionConfig.getTaskCancellationTimeout();
+                       }
+
                        // now load the task's invokable code
                        invokable = 
loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
 
@@ -868,12 +892,16 @@ public class Task implements Runnable {
                                                // because the canceling may 
block on user code, we cancel from a separate thread
                                                // we do not reuse the async 
call handler, because that one may be blocked, in which
                                                // case the canceling could not 
continue
+
+                                               // The canceller calls cancel 
and interrupts the executing thread once
                                                Runnable canceler = new 
TaskCanceler(
                                                                LOG,
                                                                invokable,
                                                                executingThread,
                                                                
taskNameWithSubtask,
                                                                
taskCancellationInterval,
+                                                               
taskCancellationTimeout,
+                                                               taskManager,
                                                                
producedPartitions,
                                                                inputGates);
                                                Thread cancelThread = new 
Thread(executingThread.getThreadGroup(), canceler,
@@ -1119,16 +1147,29 @@ public class Task implements Runnable {
                private final AbstractInvokable invokable;
                private final Thread executer;
                private final String taskName;
-               private final long taskCancellationIntervalMillis;
                private final ResultPartition[] producedPartitions;
                private final SingleInputGate[] inputGates;
 
+               /** Interrupt interval. */
+               private final long interruptInterval;
+
+               /** Timeout after which a fatal error notification happens. */
+               private final long interruptTimeout;
+
+               /** TaskManager to notify about a timeout */
+               private final ActorGateway taskManager;
+
+               /** Watch Dog thread */
+               private final Thread watchDogThread;
+
                public TaskCanceler(
                                Logger logger,
                                AbstractInvokable invokable,
                                Thread executer,
                                String taskName,
-                               long cancelationInterval,
+                               long cancellationInterval,
+                               long cancellationTimeout,
+                               ActorGateway taskManager,
                                ResultPartition[] producedPartitions,
                                SingleInputGate[] inputGates) {
 
@@ -1136,26 +1177,46 @@ public class Task implements Runnable {
                        this.invokable = invokable;
                        this.executer = executer;
                        this.taskName = taskName;
-                       this.taskCancellationIntervalMillis = 
cancelationInterval;
+                       this.interruptInterval = cancellationInterval;
+                       this.interruptTimeout = cancellationTimeout;
+                       this.taskManager = taskManager;
                        this.producedPartitions = producedPartitions;
                        this.inputGates = inputGates;
+
+                       if (cancellationTimeout > 0) {
+                               // The watch dog repeatedly interrupts the 
executor until
+                               // the cancellation timeout kicks in (at which 
point the
+                               // task manager is notified about a fatal 
error) or the
+                               // executor has terminated.
+                               this.watchDogThread = new Thread(
+                                               executer.getThreadGroup(),
+                                               new TaskCancelerWatchDog(),
+                                               "WatchDog for " + taskName + " 
cancellation");
+                               this.watchDogThread.setDaemon(true);
+                       } else {
+                               this.watchDogThread = null;
+                       }
                }
 
                @Override
                public void run() {
                        try {
+                               if (watchDogThread != null) {
+                                       watchDogThread.start();
+                               }
+
                                // the user-defined cancel method may throw 
errors.
                                // we need do continue despite that
                                try {
                                        invokable.cancel();
-                               }
-                               catch (Throwable t) {
+                               } catch (Throwable t) {
                                        logger.error("Error while canceling the 
task", t);
                                }
 
                                // Early release of input and output buffer 
pools. We do this
                                // in order to unblock async Threads, which 
produce/consume the
-                               // intermediate streams outside of the main 
Task Thread.
+                               // intermediate streams outside of the main 
Task Thread (like
+                               // the Kafka consumer).
                                //
                                // Don't do this before cancelling the 
invokable. Otherwise we
                                // will get misleading errors in the logs.
@@ -1178,16 +1239,45 @@ public class Task implements Runnable {
                                // interrupt the running thread initially
                                executer.interrupt();
                                try {
-                                       
executer.join(taskCancellationIntervalMillis);
+                                       executer.join(interruptInterval);
                                }
                                catch (InterruptedException e) {
                                        // we can ignore this
                                }
 
-                               // it is possible that the user code does not 
react immediately. for that
-                               // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
-                               // it exits
+                               if (watchDogThread != null) {
+                                       watchDogThread.interrupt();
+                                       watchDogThread.join();
+                               }
+                       } catch (Throwable t) {
+                               logger.error("Error in the task canceler", t);
+                       }
+               }
+
+               /**
+                * Watchdog for the cancellation. If the task is stuck in 
cancellation,
+                * we notify the task manager about a fatal error.
+                */
+               private class TaskCancelerWatchDog implements Runnable {
+
+                       @Override
+                       public void run() {
+                               long intervalNanos = 
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+                               long timeoutNanos = 
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+                               long deadline = System.nanoTime() + 
timeoutNanos;
+
+                               try {
+                                       // Initial wait before interrupting 
periodically
+                                       Thread.sleep(interruptInterval);
+                               } catch (InterruptedException ignored) {
+                               }
+
+                               // It is possible that the user code does not 
react to the task canceller.
+                               // for that reason, we spawn this separate 
thread that repeatedly interrupts
+                               // the user code until it exits. If the suer 
user code does not exit within
+                               // the timeout, we notify the job manager about 
a fatal error.
                                while (executer.isAlive()) {
+                                       long now = System.nanoTime();
 
                                        // build the stack trace of where the 
thread is stuck, for the log
                                        StringBuilder bld = new StringBuilder();
@@ -1196,21 +1286,34 @@ public class Task implements Runnable {
                                                bld.append(e).append('\n');
                                        }
 
-                                       logger.warn("Task '{}' did not react to 
cancelling signal, but is stuck in method:\n {}",
-                                                       taskName, 
bld.toString());
+                                       if (now >= deadline) {
+                                               long duration = 
TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+                                               String msg = 
String.format("Task '%s' did not react to cancelling signal in " +
+                                                                               
"the last %d seconds, but is stuck in method:\n %s",
+                                                               taskName,
+                                                               duration,
+                                                               bld.toString());
 
-                                       executer.interrupt();
-                                       try {
-                                               
executer.join(taskCancellationIntervalMillis);
-                                       }
-                                       catch (InterruptedException e) {
-                                               // we can ignore this
+                                               taskManager.tell(new 
TaskManagerMessages.FatalError(msg, null));
+
+                                               return; // done, don't forget 
to leave the loop
+                                       } else {
+                                               logger.warn("Task '{}' did not 
react to cancelling signal, but is stuck in method:\n {}",
+                                                               taskName, 
bld.toString());
+
+                                               executer.interrupt();
+                                               try {
+                                                       long timeLeftNanos = 
Math.min(intervalNanos, deadline - now);
+                                                       long timeLeftMillis = 
TimeUnit.MILLISECONDS.convert(timeLeftNanos, TimeUnit.NANOSECONDS);
+
+                                                       if (timeLeftMillis > 0) 
{
+                                                               
executer.join(timeLeftMillis);
+                                                       }
+                                               } catch (InterruptedException 
ignored) {
+                                               }
                                        }
                                }
                        }
-                       catch (Throwable t) {
-                               logger.error("Error in the task canceler", t);
-                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
new file mode 100644
index 0000000..1f0e84d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that the TaskManager process properly exits when the TaskManager 
actor dies.
+ */
+public class TaskManagerProcessReapingFatalErrorTest extends 
TaskManagerProcessReapingTestBase {
+
+       @Override
+       void onTaskManagerProcessRunning(ActorRef taskManager) {
+               taskManager.tell(new TaskManagerMessages.FatalError("ouch", 
null), ActorRef.noSender());
+       }
+
+       @Override
+       void onTaskManagerProcessTerminated(String processOutput) {
+               assertTrue("Did not log expected message", 
processOutput.contains("ouch"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 85d6ede..8aed021 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -19,248 +19,17 @@
 package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Test;
-
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static 
org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
-
 /**
  * Tests that the TaskManager process properly exits when the TaskManager 
actor dies.
  */
-public class TaskManagerProcessReapingTest {
-
-       @Test
-       public void testReapProcessOnFailure() {
-               Process taskManagerProcess = null;
-               ActorSystem jmActorSystem = null;
-
-               final StringWriter processOutput = new StringWriter();
-
-               try {
-                       String javaCommand = getJavaCommandPath();
-
-                       // check that we run this test only if the java command
-                       // is available on this machine
-                       if (javaCommand == null) {
-                               System.out.println("---- Skipping 
TaskManagerProcessReapingTest : Could not find java executable ----");
-                               return;
-                       }
-
-                       // create a logging file for the process
-                       File tempLogFile = File.createTempFile("testlogconfig", 
"properties");
-                       tempLogFile.deleteOnExit();
-                       CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
-                       final InetAddress localhost = 
InetAddress.getByName("localhost");
-                       final int jobManagerPort = NetUtils.getAvailablePort();
-
-                       // start a JobManager
-                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
-                       jmActorSystem = AkkaUtils.createActorSystem(
-                                       new Configuration(), new 
Some<Tuple2<String, Object>>(localAddress));
-
-                       ActorRef jmActor = JobManager.startJobManagerActors(
-                               new Configuration(),
-                               jmActorSystem,
-                               JobManager.class,
-                               MemoryArchivist.class)._1;
-
-                       // start a ResourceManager
-                       StandaloneLeaderRetrievalService 
standaloneLeaderRetrievalService =
-                               new 
StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
-
-                       FlinkResourceManager.startResourceManagerActors(
-                               new Configuration(),
-                               jmActorSystem,
-                               standaloneLeaderRetrievalService,
-                               StandaloneResourceManager.class);
-
-                       final int taskManagerPort = NetUtils.getAvailablePort();
-
-                       // start the task manager process
-                       String[] command = new String[] {
-                                       javaCommand,
-                                       "-Dlog.level=DEBUG",
-                                       "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
-                                       "-Xms256m", "-Xmx256m",
-                                       "-classpath", getCurrentClasspath(),
-                                       
TaskManagerTestEntryPoint.class.getName(),
-                                       String.valueOf(jobManagerPort), 
String.valueOf(taskManagerPort)
-                       };
-
-                       ProcessBuilder bld = new ProcessBuilder(command);
-                       taskManagerProcess = bld.start();
-                       new PipeForwarder(taskManagerProcess.getErrorStream(), 
processOutput);
-
-                       // grab the reference to the TaskManager. try multiple 
times, until the process
-                       // is started and the TaskManager is up
-                       String taskManagerActorName = 
String.format("akka.tcp://flink@%s/user/%s",
-                                       
org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, 
taskManagerPort),
-                                       TaskManager.TASK_MANAGER_NAME());
-
-                       ActorRef taskManagerRef = null;
-                       Throwable lastError = null;
-                       for (int i = 0; i < 40; i++) {
-                               try {
-                                       taskManagerRef = 
TaskManager.getTaskManagerRemoteReference(
-                                                       taskManagerActorName, 
jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS));
-                                       break;
-                               }
-                               catch (Throwable t) {
-                                       // TaskManager probably not ready yet
-                                       lastError = t;
-                               }
-                               Thread.sleep(500);
-                       }
+public class TaskManagerProcessReapingTest extends 
TaskManagerProcessReapingTestBase {
 
-                       assertTrue("TaskManager process died", 
isProcessAlive(taskManagerProcess));
-
-                       if (taskManagerRef == null) {
-                               if (lastError != null) {
-                                       lastError.printStackTrace();
-                               }
-                               fail("TaskManager process did not launch the 
TaskManager properly. Failed to look up "
-                                               + taskManagerActorName);
-                       }
-
-                       // kill the TaskManager actor
-                       taskManagerRef.tell(PoisonPill.getInstance(), 
ActorRef.noSender());
-
-                       // wait for max 5 seconds for the process to terminate
-                       {
-                               long now = System.currentTimeMillis();
-                               long deadline = now + 10000;
-
-                               while (now < deadline && 
isProcessAlive(taskManagerProcess)) {
-                                       Thread.sleep(100);
-                                       now = System.currentTimeMillis();
-                               }
-                       }
-
-                       assertFalse("TaskManager process did not terminate upon 
actor death", isProcessAlive(taskManagerProcess));
-
-                       int returnCode = taskManagerProcess.exitValue();
-                       assertEquals("TaskManager died, but not because of the 
process reaper",
-                                       
TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       printProcessLog(processOutput.toString());
-                       fail(e.getMessage());
-               }
-               catch (Error e) {
-                       e.printStackTrace();
-                       printProcessLog(processOutput.toString());
-                       throw e;
-               }
-               finally {
-                       if (taskManagerProcess != null) {
-                               taskManagerProcess.destroy();
-                       }
-                       if (jmActorSystem != null) {
-                               jmActorSystem.shutdown();
-                       }
-               }
+       @Override
+       void onTaskManagerProcessRunning(ActorRef taskManager) {
+               // kill the TaskManager actor
+               taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
        }
 
-       private static void printProcessLog(String log) {
-               System.out.println("-----------------------------------------");
-               System.out.println("       BEGIN SPAWNED PROCESS LOG");
-               System.out.println("-----------------------------------------");
-               System.out.println(log);
-               System.out.println("-----------------------------------------");
-               System.out.println("        END SPAWNED PROCESS LOG");
-               System.out.println("-----------------------------------------");
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public static class TaskManagerTestEntryPoint {
-
-               public static void main(String[] args) {
-                       try {
-                               int jobManagerPort = Integer.parseInt(args[0]);
-                               int taskManagerPort = Integer.parseInt(args[1]);
-
-                               Configuration cfg = new Configuration();
-                               
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-                               
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
-
-                               TaskManager.runTaskManager("localhost", 
ResourceID.generate(), taskManagerPort, cfg);
-
-                               // wait forever
-                               Object lock = new Object();
-                               synchronized (lock) {
-                                       lock.wait();
-                               }
-                       }
-                       catch (Throwable t) {
-                               System.exit(1);
-                       }
-               }
-       }
-
-       private static class PipeForwarder extends Thread {
-
-               private final StringWriter target;
-               private final InputStream source;
-
-               public PipeForwarder(InputStream source, StringWriter target) {
-                       super("Pipe Forwarder");
-                       setDaemon(true);
-
-                       this.source = source;
-                       this.target = target;
-
-                       start();
-               }
-
-               @Override
-               public void run() {
-                       try {
-                               int next;
-                               while ((next = source.read()) != -1) {
-                                       target.write(next);
-                               }
-                       }
-                       catch (IOException e) {
-                               // terminate
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
new file mode 100644
index 0000000..c7913f7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+import org.junit.Test;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that the TaskManager process properly exits when the TaskManager 
actor dies.
+ */
+public abstract class TaskManagerProcessReapingTestBase {
+
+       /**
+        * Called after the task manager has been started up. After calling this
+        * method, the test base checks that the process exits.
+        */
+       abstract void onTaskManagerProcessRunning(ActorRef taskManager);
+
+       /**
+        * Called after the task manager has successfully terminated.
+        */
+       void onTaskManagerProcessTerminated(String processOutput) {
+               // Default does nothing
+       }
+
+       @Test
+       public void testReapProcessOnFailure() {
+               Process taskManagerProcess = null;
+               ActorSystem jmActorSystem = null;
+
+               final StringWriter processOutput = new StringWriter();
+
+               try {
+                       String javaCommand = getJavaCommandPath();
+
+                       // check that we run this test only if the java command
+                       // is available on this machine
+                       if (javaCommand == null) {
+                               System.out.println("---- Skipping 
TaskManagerProcessReapingTest : Could not find java executable ----");
+                               return;
+                       }
+
+                       // create a logging file for the process
+                       File tempLogFile = File.createTempFile("testlogconfig", 
"properties");
+                       tempLogFile.deleteOnExit();
+                       CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+                       final InetAddress localhost = 
InetAddress.getByName("localhost");
+                       final int jobManagerPort = NetUtils.getAvailablePort();
+
+                       // start a JobManager
+                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
+                       jmActorSystem = AkkaUtils.createActorSystem(
+                                       new Configuration(), new 
Some<Tuple2<String, Object>>(localAddress));
+
+                       ActorRef jmActor = JobManager.startJobManagerActors(
+                               new Configuration(),
+                               jmActorSystem,
+                               JobManager.class,
+                               MemoryArchivist.class)._1;
+
+                       // start a ResourceManager
+                       StandaloneLeaderRetrievalService 
standaloneLeaderRetrievalService =
+                               new 
StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
+
+                       FlinkResourceManager.startResourceManagerActors(
+                               new Configuration(),
+                               jmActorSystem,
+                               standaloneLeaderRetrievalService,
+                               StandaloneResourceManager.class);
+
+                       final int taskManagerPort = NetUtils.getAvailablePort();
+
+                       // start the task manager process
+                       String[] command = new String[] {
+                                       javaCommand,
+                                       "-Dlog.level=DEBUG",
+                                       "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
+                                       "-Xms256m", "-Xmx256m",
+                                       "-classpath", getCurrentClasspath(),
+                                       
TaskManagerTestEntryPoint.class.getName(),
+                                       String.valueOf(jobManagerPort), 
String.valueOf(taskManagerPort)
+                       };
+
+                       ProcessBuilder bld = new ProcessBuilder(command);
+                       taskManagerProcess = bld.start();
+                       new PipeForwarder(taskManagerProcess.getErrorStream(), 
processOutput);
+
+                       // grab the reference to the TaskManager. try multiple 
times, until the process
+                       // is started and the TaskManager is up
+                       String taskManagerActorName = 
String.format("akka.tcp://flink@%s/user/%s",
+                                       
org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, 
taskManagerPort),
+                                       TaskManager.TASK_MANAGER_NAME());
+
+                       ActorRef taskManagerRef = null;
+                       Throwable lastError = null;
+                       for (int i = 0; i < 40; i++) {
+                               try {
+                                       taskManagerRef = 
TaskManager.getTaskManagerRemoteReference(
+                                                       taskManagerActorName, 
jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS));
+                                       break;
+                               }
+                               catch (Throwable t) {
+                                       // TaskManager probably not ready yet
+                                       lastError = t;
+                               }
+                               Thread.sleep(500);
+                       }
+
+                       assertTrue("TaskManager process died", 
isProcessAlive(taskManagerProcess));
+
+                       if (taskManagerRef == null) {
+                               if (lastError != null) {
+                                       lastError.printStackTrace();
+                               }
+                               fail("TaskManager process did not launch the 
TaskManager properly. Failed to look up "
+                                               + taskManagerActorName);
+                       }
+
+                       // kill the TaskManager actor
+                       onTaskManagerProcessRunning(taskManagerRef);
+
+                       // wait for max 5 seconds for the process to terminate
+                       {
+                               long now = System.currentTimeMillis();
+                               long deadline = now + 10000;
+
+                               while (now < deadline && 
isProcessAlive(taskManagerProcess)) {
+                                       Thread.sleep(100);
+                                       now = System.currentTimeMillis();
+                               }
+                       }
+
+                       assertFalse("TaskManager process did not terminate upon 
actor death", isProcessAlive(taskManagerProcess));
+
+                       int returnCode = taskManagerProcess.exitValue();
+                       assertEquals("TaskManager died, but not because of the 
process reaper",
+                                       
TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
+
+                       
onTaskManagerProcessTerminated(processOutput.toString());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       printProcessLog(processOutput.toString());
+                       fail(e.getMessage());
+               }
+               catch (Error e) {
+                       e.printStackTrace();
+                       printProcessLog(processOutput.toString());
+                       throw e;
+               }
+               finally {
+                       if (taskManagerProcess != null) {
+                               taskManagerProcess.destroy();
+                       }
+                       if (jmActorSystem != null) {
+                               jmActorSystem.shutdown();
+                       }
+               }
+       }
+
+       private static void printProcessLog(String log) {
+               System.out.println("-----------------------------------------");
+               System.out.println("       BEGIN SPAWNED PROCESS LOG");
+               System.out.println("-----------------------------------------");
+               System.out.println(log);
+               System.out.println("-----------------------------------------");
+               System.out.println("        END SPAWNED PROCESS LOG");
+               System.out.println("-----------------------------------------");
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class TaskManagerTestEntryPoint {
+
+               public static void main(String[] args) {
+                       try {
+                               int jobManagerPort = Integer.parseInt(args[0]);
+                               int taskManagerPort = Integer.parseInt(args[1]);
+
+                               Configuration cfg = new Configuration();
+                               
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+                               
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
+
+                               TaskManager.runTaskManager("localhost", 
ResourceID.generate(), taskManagerPort, cfg);
+
+                               // wait forever
+                               Object lock = new Object();
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       }
+                       catch (Throwable t) {
+                               System.exit(1);
+                       }
+               }
+       }
+
+       private static class PipeForwarder extends Thread {
+
+               private final StringWriter target;
+               private final InputStream source;
+
+               public PipeForwarder(InputStream source, StringWriter target) {
+                       super("Pipe Forwarder");
+                       setDaemon(true);
+
+                       this.source = source;
+                       this.target = target;
+
+                       start();
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               int next;
+                               while ((next = source.read()) != -1) {
+                                       target.write(next);
+                               }
+                       }
+                       catch (IOException e) {
+                               // terminate
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index fec9ef3..56ab9c6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,10 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -34,7 +35,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -44,13 +44,14 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
-
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -69,7 +70,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doThrow;
@@ -86,11 +86,12 @@ import static org.mockito.Mockito.when;
  * execution listener, which simply put the messages in a queue to be picked
  * up by the test and validated.
  */
-public class TaskTest {
+public class TaskTest extends TestLogger {
        
        private static OneShotLatch awaitLatch;
        private static OneShotLatch triggerLatch;
-       
+       private static OneShotLatch cancelLatch;
+
        private ActorGateway taskManagerGateway;
        private ActorGateway jobManagerGateway;
        private ActorGateway listenerGateway;
@@ -110,6 +111,7 @@ public class TaskTest {
                
                awaitLatch = new OneShotLatch();
                triggerLatch = new OneShotLatch();
+               cancelLatch = new OneShotLatch();
        }
 
        @After
@@ -557,6 +559,123 @@ public class TaskTest {
                verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
        }
 
+       /**
+        * Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+        * Task cancellation blocks the task canceller. Interrupt after cancel 
via
+        * cancellation watch dog.
+        */
+       @Test
+       public void testWatchDogInterruptsTask() throws Exception {
+               Configuration config = new Configuration();
+               
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+               
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+               Task task = createTask(InvokableBlockingInCancel.class, config);
+               task.startTaskThread();
+
+               awaitLatch.await();
+
+               task.cancelExecution();
+               task.getExecutingThread().join();
+
+               // No fatal error
+               for (Object msg : taskManagerMessages) {
+                       assertFalse("Unexpected FatalError message", msg 
instanceof TaskManagerMessages.FatalError);
+               }
+       }
+
+       /**
+        * The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+        * and cancel cannot complete because it also tries to acquire the same 
lock.
+        * This is resolved by the watch dog, no fatal error.
+        */
+       @Test
+       public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+               Configuration config = new Configuration();
+               
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+               
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+               Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+               task.startTaskThread();
+
+               awaitLatch.await();
+
+               task.cancelExecution();
+               task.getExecutingThread().join();
+
+               // No fatal error
+               for (Object msg : taskManagerMessages) {
+                       assertFalse("Unexpected FatalError message", msg 
instanceof TaskManagerMessages.FatalError);
+               }
+       }
+
+       /**
+        * The invoke() method blocks infinitely, but cancel() does not block. 
Only
+        * resolved by a fatal error.
+        */
+       @Test
+       public void testFatalErrorAfterUninterruptibleInvoke() throws Exception 
{
+               Configuration config = new Configuration();
+               
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+               
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+               Task task = 
createTask(InvokableUninterruptibleBlockingInvoke.class, config);
+
+               try {
+                       task.startTaskThread();
+
+                       awaitLatch.await();
+
+                       task.cancelExecution();
+
+                       for (int i = 0; i < 10; i++) {
+                               Object msg = taskManagerMessages.poll(1, 
TimeUnit.SECONDS);
+                               if (msg instanceof 
TaskManagerMessages.FatalError) {
+                                       return; // success
+                               }
+                       }
+
+                       fail("Did not receive expected task manager message");
+               } finally {
+                       // Interrupt again to clean up Thread
+                       cancelLatch.trigger();
+                       task.getExecutingThread().interrupt();
+                       task.getExecutingThread().join();
+               }
+       }
+
+       /**
+        * Tests that the task configuration is respected and overwritten by 
the execution config.
+        */
+       @Test
+       public void testTaskConfig() throws Exception {
+               long interval = 28218123;
+               long timeout = interval + 19292;
+
+               Configuration config = new Configuration();
+               
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, interval);
+               
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, timeout);
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.setTaskCancellationInterval(interval + 1337);
+               executionConfig.setTaskCancellationTimeout(timeout - 1337);
+
+               Task task = createTask(InvokableBlockingInInvoke.class, config, 
executionConfig);
+
+               assertEquals(interval, task.getTaskCancellationInterval());
+               assertEquals(timeout, task.getTaskCancellationTimeout());
+
+               task.startTaskThread();
+
+               awaitLatch.await();
+
+               assertEquals(executionConfig.getTaskCancellationInterval(), 
task.getTaskCancellationInterval());
+               assertEquals(executionConfig.getTaskCancellationTimeout(), 
task.getTaskCancellationTimeout());
+
+               task.getExecutingThread().interrupt();
+               task.getExecutingThread().join();
+       }
+
        // 
------------------------------------------------------------------------
 
        private void setInputGate(Task task, SingleInputGate inputGate) {
@@ -589,13 +708,28 @@ public class TaskTest {
        }
 
        private Task createTask(Class<? extends AbstractInvokable> invokable) {
+               return createTask(invokable, new Configuration(), new 
ExecutionConfig());
+       }
+
+       private Task createTask(Class<? extends AbstractInvokable> invokable, 
Configuration config) {
+               return createTask(invokable, config, new ExecutionConfig());
+       }
+
+       private Task createTask(Class<? extends AbstractInvokable> invokable, 
Configuration config, ExecutionConfig execConfig) {
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-               return createTask(invokable, libCache);
+               return createTask(invokable, libCache, config, execConfig);
        }
 
        private Task createTask(Class<? extends AbstractInvokable> invokable,
                                                        LibraryCacheManager 
libCache) {
+               return createTask(invokable, libCache, new Configuration(), new 
ExecutionConfig());
+       }
+
+       private Task createTask(Class<? extends AbstractInvokable> invokable,
+                                                       LibraryCacheManager 
libCache,
+                                                       Configuration config,
+                                                       ExecutionConfig 
execConfig) {
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
@@ -604,14 +738,23 @@ public class TaskTest {
                
when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                
-               return createTask(invokable, libCache, network);
+               return createTask(invokable, libCache, network, config, 
execConfig);
        }
        
        private Task createTask(Class<? extends AbstractInvokable> invokable,
                                                        LibraryCacheManager 
libCache,
                                                        NetworkEnvironment 
networkEnvironment) {
+
+               return createTask(invokable, libCache, networkEnvironment, new 
Configuration(), new ExecutionConfig());
+       }
+
+       private Task createTask(Class<? extends AbstractInvokable> invokable,
+                                                       LibraryCacheManager 
libCache,
+                                                       NetworkEnvironment 
networkEnvironment,
+                                                       Configuration config,
+                                                       ExecutionConfig 
execConfig) {
                
-               TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(invokable);
+               TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(invokable, config, execConfig);
                
                return new Task(
                                tdd,
@@ -629,18 +772,26 @@ public class TaskTest {
        }
 
        private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? 
extends AbstractInvokable> invokable) {
-               SerializedValue<ExecutionConfig> execConfig;
+               return createTaskDeploymentDescriptor(invokable, new 
Configuration(), new ExecutionConfig());
+       }
+
+       private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+                       Class<? extends AbstractInvokable> invokable,
+                       Configuration taskConfig,
+                       ExecutionConfig execConfig) {
+
+               SerializedValue<ExecutionConfig> serializedExecConfig;
                try {
-                       execConfig = new SerializedValue<>(new 
ExecutionConfig());
+                       serializedExecConfig = new 
SerializedValue<>(execConfig);
                } catch (IOException e) {
                        throw new RuntimeException(e);
                }
                
                return new TaskDeploymentDescriptor(
                                new JobID(), "Test Job", new JobVertexID(), new 
ExecutionAttemptID(),
-                               execConfig,
+                               serializedExecConfig,
                                "Test Task", 0, 1, 0,
-                               new Configuration(), new Configuration(),
+                               new Configuration(), taskConfig,
                                invokable.getName(),
                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -842,4 +993,72 @@ public class TaskTest {
                        throw new CancelTaskException();
                }
        }
+
+       public static final class 
InvokableInterruptableSharedLockInInvokeAndCancel extends AbstractInvokable {
+
+               private final Object lock = new Object();
+
+               @Override
+               public void invoke() throws Exception {
+                       synchronized (lock) {
+                               awaitLatch.trigger();
+                               wait();
+                       }
+               }
+
+               @Override
+               public void cancel() throws Exception {
+                       synchronized (lock) {
+                               cancelLatch.trigger();
+                       }
+               }
+       }
+
+       public static final class InvokableBlockingInCancel extends 
AbstractInvokable {
+
+               @Override
+               public void invoke() throws Exception {
+                       awaitLatch.trigger();
+
+                       try {
+                               cancelLatch.await();
+                               synchronized (this) {
+                                       wait();
+                               }
+                       } catch (InterruptedException ignored) {
+                               synchronized (this) {
+                                       notifyAll(); // notify all that are 
stuck in cancel
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() throws Exception {
+
+                       synchronized (this) {
+                               cancelLatch.trigger();
+                               wait();
+                       }
+               }
+       }
+
+       public static final class InvokableUninterruptibleBlockingInvoke 
extends AbstractInvokable {
+
+               @Override
+               public void invoke() throws Exception {
+                       while (!cancelLatch.isTriggered()) {
+                               try {
+                                       synchronized (this) {
+                                               awaitLatch.trigger();
+                                               wait();
+                                       }
+                               } catch (InterruptedException ignored) {
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() throws Exception {
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index d802860..9ad2c30 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -52,4 +52,13 @@ public final class OneShotLatch {
                        }
                }
        }
+
+       /**
+        * Checks if the latch was triggered.
+        *
+        * @return True, if the latch was triggered, false if not.
+        */
+       public boolean isTriggered() {
+               return triggered;
+       }
 }

Reply via email to