Repository: flink
Updated Branches:
refs/heads/release-1.1 529534f33 -> cc6655b7b
[FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck
- Splits the cancellation up into two threads:
* The `TaskCanceler` calls `cancel` on the invokable and `interrupt`
on the executing Thread. It then exists.
* The `TaskCancellationWatchDog` kicks in after the task cancellation
timeout (current default: 30 secs) and periodically calls `interrupt`
on the executing Thread. If the Thread does not terminate within
the task cancellation timeout (new config value, default 3 mins), the task
manager is notified about a fatal error, leading to termination of the JVM.
- The new configuration is exposed via
`ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
(default: 3 mins) and the `ExecutionConfig` (similar to the cancellation
interval).
Backported with slight adjustments from the master branch.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc6655b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc6655b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc6655b7
Branch: refs/heads/release-1.1
Commit: cc6655b7b4518a105971c7965d313a7aefcfc613
Parents: 529534f
Author: Ufuk Celebi <[email protected]>
Authored: Tue Oct 18 09:50:36 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Oct 27 17:41:44 2016 +0200
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 46 +++-
.../flink/configuration/ConfigConstants.java | 14 +
.../apache/flink/runtime/taskmanager/Task.java | 157 +++++++++--
...TaskManagerProcessReapingFatalErrorTest.java | 40 +++
.../TaskManagerProcessReapingTest.java | 241 +---------------
.../TaskManagerProcessReapingTestBase.java | 275 +++++++++++++++++++
.../flink/runtime/taskmanager/TaskTest.java | 247 ++++++++++++++++-
.../flink/core/testutils/OneShotLatch.java | 9 +
8 files changed, 748 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 5b69794..cbcafed 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -19,11 +19,9 @@
package org.apache.flink.api.common;
import com.esotericsoftware.kryo.Serializer;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.util.Preconditions;
-
import java.io.Serializable;
import java.util.Collections;
@@ -32,6 +30,8 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* A config to define the behavior of the program execution. It allows to
define (among other
* options) the following settings:
@@ -116,6 +116,12 @@ public class ExecutionConfig implements Serializable {
private long taskCancellationIntervalMillis = -1;
+ /**
+ * Timeout after which an ongoing task cancellation will lead to a fatal
+ * TaskManager error, usually killing the JVM.
+ */
+ private long taskCancellationTimeoutMillis = -1;
+
// ------------------------------- User code values
--------------------------------------------
private GlobalJobParameters globalJobParameters;
@@ -219,7 +225,7 @@ public class ExecutionConfig implements Serializable {
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
- Preconditions.checkArgument(parallelism > 0 || parallelism ==
PARALLELISM_DEFAULT,
+ checkArgument(parallelism > 0 || parallelism ==
PARALLELISM_DEFAULT,
"The parallelism of an operator must be at least 1.");
this.parallelism = parallelism;
@@ -245,6 +251,38 @@ public class ExecutionConfig implements Serializable {
}
/**
+ * Returns the timeout (in milliseconds) after which an ongoing task
+ * cancellation leads to a fatal TaskManager error.
+ *
+ * <p>The value <code>0</code> disables the timeout. In this case a
stuck
+ * cancellation will not lead to a fatal error.
+ */
+ @PublicEvolving
+ public long getTaskCancellationTimeout() {
+ return this.taskCancellationTimeoutMillis;
+ }
+
+ /**
+ * Sets the timeout (in milliseconds) after which an ongoing task
cancellation
+ * is considered failed, leading to a fatal TaskManager error.
+ *
+ * <p>By default, this is deactivated.
+ *
+ * <p>The cluster default is configured via {@link
org.apache.flink.configuration.ConfigConstants#TASK_CANCELLATION_TIMEOUT_MILLIS}.
+ *
+ * <p>The value <code>0</code> disables the timeout. In this case a
stuck
+ * cancellation will not lead to a fatal error.
+ *
+ * @param timeout The task cancellation timeout (in milliseconds).
+ */
+ @PublicEvolving
+ public ExecutionConfig setTaskCancellationTimeout(long timeout) {
+ checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
+ this.taskCancellationTimeoutMillis = timeout;
+ return this;
+ }
+
+ /**
* Sets the restart strategy to be used for recovery.
*
* <pre>{@code
http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9902350..d1ad1c4 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -270,6 +270,13 @@ public final class ConfigConstants {
@PublicEvolving
public static final String TASK_CANCELLATION_INTERVAL_MILLIS =
"task.cancellation-interval";
+ /**
+ * Timeout in milliseconds after which a task cancellation times out and
+ * leads to a fatal TaskManager error.
+ */
+ @PublicEvolving
+ public static final String TASK_CANCELLATION_TIMEOUT_MILLIS =
"task.cancellation.timeout";
+
// --------------------------- Runtime Algorithms
-------------------------------
/**
@@ -859,6 +866,13 @@ public final class ConfigConstants {
* */
public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS =
30000;
+ /**
+ * Default timeout in milliseconds after which a task cancellation
times out
+ * and leads to a fatal TaskManager error. This has been backported
from 1.2 and
+ * deactivated by default.
+ */
+ public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0;
// deactivated
+
// ------------------------ Runtime Algorithms ------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 25a7e29..4b47cba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.taskmanager;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
@@ -25,7 +26,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -52,18 +52,18 @@ import
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
import org.apache.flink.runtime.messages.TaskMessages.FailTask;
import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateUtils;
import org.apache.flink.util.SerializedValue;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
@@ -76,6 +76,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -229,6 +230,9 @@ public class Task implements Runnable {
/** Initialized from the Flink configuration. May also be set at the
ExecutionConfig */
private long taskCancellationInterval;
+ /** Initialized from the Flink configuration. May also be set at the
ExecutionConfig */
+ private long taskCancellationTimeout;
+
/**
* <p><b>IMPORTANT:</b> This constructor may not start any work that
would need to
* be undone in the case of a failing task deployment.</p>
@@ -259,9 +263,14 @@ public class Task implements Runnable {
this.operatorState = tdd.getOperatorState();
this.serializedExecutionConfig =
checkNotNull(tdd.getSerializedExecutionConfig());
- this.taskCancellationInterval = jobConfiguration.getLong(
- ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
-
ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
+ Configuration taskConfig = tdd.getTaskConfiguration();
+ this.taskCancellationInterval = taskConfig.getLong(
+
ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
+
ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
+
+ this.taskCancellationTimeout = taskConfig.getLong(
+
ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS,
+
ConfigConstants.DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS);
this.memoryManager = checkNotNull(memManager);
this.ioManager = checkNotNull(ioManager);
@@ -381,6 +390,16 @@ public class Task implements Runnable {
return executingThread;
}
+ @VisibleForTesting
+ long getTaskCancellationInterval() {
+ return taskCancellationInterval;
+ }
+
+ @VisibleForTesting
+ long getTaskCancellationTimeout() {
+ return taskCancellationTimeout;
+ }
+
//
------------------------------------------------------------------------
// Task Execution
//
------------------------------------------------------------------------
@@ -478,6 +497,11 @@ public class Task implements Runnable {
taskCancellationInterval =
executionConfig.getTaskCancellationInterval();
}
+ if (executionConfig.getTaskCancellationTimeout() >= 0) {
+ // override task cancellation timeout from
Flink config if set in ExecutionConfig
+ taskCancellationTimeout =
executionConfig.getTaskCancellationTimeout();
+ }
+
// now load the task's invokable code
invokable =
loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
@@ -868,12 +892,16 @@ public class Task implements Runnable {
// because the canceling may
block on user code, we cancel from a separate thread
// we do not reuse the async
call handler, because that one may be blocked, in which
// case the canceling could not
continue
+
+ // The canceller calls cancel
and interrupts the executing thread once
Runnable canceler = new
TaskCanceler(
LOG,
invokable,
executingThread,
taskNameWithSubtask,
taskCancellationInterval,
+
taskCancellationTimeout,
+ taskManager,
producedPartitions,
inputGates);
Thread cancelThread = new
Thread(executingThread.getThreadGroup(), canceler,
@@ -1119,16 +1147,29 @@ public class Task implements Runnable {
private final AbstractInvokable invokable;
private final Thread executer;
private final String taskName;
- private final long taskCancellationIntervalMillis;
private final ResultPartition[] producedPartitions;
private final SingleInputGate[] inputGates;
+ /** Interrupt interval. */
+ private final long interruptInterval;
+
+ /** Timeout after which a fatal error notification happens. */
+ private final long interruptTimeout;
+
+ /** TaskManager to notify about a timeout */
+ private final ActorGateway taskManager;
+
+ /** Watch Dog thread */
+ private final Thread watchDogThread;
+
public TaskCanceler(
Logger logger,
AbstractInvokable invokable,
Thread executer,
String taskName,
- long cancelationInterval,
+ long cancellationInterval,
+ long cancellationTimeout,
+ ActorGateway taskManager,
ResultPartition[] producedPartitions,
SingleInputGate[] inputGates) {
@@ -1136,26 +1177,46 @@ public class Task implements Runnable {
this.invokable = invokable;
this.executer = executer;
this.taskName = taskName;
- this.taskCancellationIntervalMillis =
cancelationInterval;
+ this.interruptInterval = cancellationInterval;
+ this.interruptTimeout = cancellationTimeout;
+ this.taskManager = taskManager;
this.producedPartitions = producedPartitions;
this.inputGates = inputGates;
+
+ if (cancellationTimeout > 0) {
+ // The watch dog repeatedly interrupts the
executor until
+ // the cancellation timeout kicks in (at which
point the
+ // task manager is notified about a fatal
error) or the
+ // executor has terminated.
+ this.watchDogThread = new Thread(
+ executer.getThreadGroup(),
+ new TaskCancelerWatchDog(),
+ "WatchDog for " + taskName + "
cancellation");
+ this.watchDogThread.setDaemon(true);
+ } else {
+ this.watchDogThread = null;
+ }
}
@Override
public void run() {
try {
+ if (watchDogThread != null) {
+ watchDogThread.start();
+ }
+
// the user-defined cancel method may throw
errors.
// we need do continue despite that
try {
invokable.cancel();
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
logger.error("Error while canceling the
task", t);
}
// Early release of input and output buffer
pools. We do this
// in order to unblock async Threads, which
produce/consume the
- // intermediate streams outside of the main
Task Thread.
+ // intermediate streams outside of the main
Task Thread (like
+ // the Kafka consumer).
//
// Don't do this before cancelling the
invokable. Otherwise we
// will get misleading errors in the logs.
@@ -1178,16 +1239,45 @@ public class Task implements Runnable {
// interrupt the running thread initially
executer.interrupt();
try {
-
executer.join(taskCancellationIntervalMillis);
+ executer.join(interruptInterval);
}
catch (InterruptedException e) {
// we can ignore this
}
- // it is possible that the user code does not
react immediately. for that
- // reason, we spawn a separate thread that
repeatedly interrupts the user code until
- // it exits
+ if (watchDogThread != null) {
+ watchDogThread.interrupt();
+ watchDogThread.join();
+ }
+ } catch (Throwable t) {
+ logger.error("Error in the task canceler", t);
+ }
+ }
+
+ /**
+ * Watchdog for the cancellation. If the task is stuck in
cancellation,
+ * we notify the task manager about a fatal error.
+ */
+ private class TaskCancelerWatchDog implements Runnable {
+
+ @Override
+ public void run() {
+ long intervalNanos =
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+ long timeoutNanos =
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+ long deadline = System.nanoTime() +
timeoutNanos;
+
+ try {
+ // Initial wait before interrupting
periodically
+ Thread.sleep(interruptInterval);
+ } catch (InterruptedException ignored) {
+ }
+
+ // It is possible that the user code does not
react to the task canceller.
+ // for that reason, we spawn this separate
thread that repeatedly interrupts
+ // the user code until it exits. If the suer
user code does not exit within
+ // the timeout, we notify the job manager about
a fatal error.
while (executer.isAlive()) {
+ long now = System.nanoTime();
// build the stack trace of where the
thread is stuck, for the log
StringBuilder bld = new StringBuilder();
@@ -1196,21 +1286,34 @@ public class Task implements Runnable {
bld.append(e).append('\n');
}
- logger.warn("Task '{}' did not react to
cancelling signal, but is stuck in method:\n {}",
- taskName,
bld.toString());
+ if (now >= deadline) {
+ long duration =
TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+ String msg =
String.format("Task '%s' did not react to cancelling signal in " +
+
"the last %d seconds, but is stuck in method:\n %s",
+ taskName,
+ duration,
+ bld.toString());
- executer.interrupt();
- try {
-
executer.join(taskCancellationIntervalMillis);
- }
- catch (InterruptedException e) {
- // we can ignore this
+ taskManager.tell(new
TaskManagerMessages.FatalError(msg, null));
+
+ return; // done, don't forget
to leave the loop
+ } else {
+ logger.warn("Task '{}' did not
react to cancelling signal, but is stuck in method:\n {}",
+ taskName,
bld.toString());
+
+ executer.interrupt();
+ try {
+ long timeLeftNanos =
Math.min(intervalNanos, deadline - now);
+ long timeLeftMillis =
TimeUnit.MILLISECONDS.convert(timeLeftNanos, TimeUnit.NANOSECONDS);
+
+ if (timeLeftMillis > 0)
{
+
executer.join(timeLeftMillis);
+ }
+ } catch (InterruptedException
ignored) {
+ }
}
}
}
- catch (Throwable t) {
- logger.error("Error in the task canceler", t);
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
new file mode 100644
index 0000000..1f0e84d
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that the TaskManager process properly exits when the TaskManager
actor dies.
+ */
+public class TaskManagerProcessReapingFatalErrorTest extends
TaskManagerProcessReapingTestBase {
+
+ @Override
+ void onTaskManagerProcessRunning(ActorRef taskManager) {
+ taskManager.tell(new TaskManagerMessages.FatalError("ouch",
null), ActorRef.noSender());
+ }
+
+ @Override
+ void onTaskManagerProcessTerminated(String processOutput) {
+ assertTrue("Did not log expected message",
processOutput.contains("ouch"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 85d6ede..8aed021 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -19,248 +19,17 @@
package org.apache.flink.runtime.taskmanager;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Test;
-
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import static
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static
org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
-
/**
* Tests that the TaskManager process properly exits when the TaskManager
actor dies.
*/
-public class TaskManagerProcessReapingTest {
-
- @Test
- public void testReapProcessOnFailure() {
- Process taskManagerProcess = null;
- ActorSystem jmActorSystem = null;
-
- final StringWriter processOutput = new StringWriter();
-
- try {
- String javaCommand = getJavaCommandPath();
-
- // check that we run this test only if the java command
- // is available on this machine
- if (javaCommand == null) {
- System.out.println("---- Skipping
TaskManagerProcessReapingTest : Could not find java executable ----");
- return;
- }
-
- // create a logging file for the process
- File tempLogFile = File.createTempFile("testlogconfig",
"properties");
- tempLogFile.deleteOnExit();
- CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
- final InetAddress localhost =
InetAddress.getByName("localhost");
- final int jobManagerPort = NetUtils.getAvailablePort();
-
- // start a JobManager
- Tuple2<String, Object> localAddress = new
Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
- jmActorSystem = AkkaUtils.createActorSystem(
- new Configuration(), new
Some<Tuple2<String, Object>>(localAddress));
-
- ActorRef jmActor = JobManager.startJobManagerActors(
- new Configuration(),
- jmActorSystem,
- JobManager.class,
- MemoryArchivist.class)._1;
-
- // start a ResourceManager
- StandaloneLeaderRetrievalService
standaloneLeaderRetrievalService =
- new
StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
-
- FlinkResourceManager.startResourceManagerActors(
- new Configuration(),
- jmActorSystem,
- standaloneLeaderRetrievalService,
- StandaloneResourceManager.class);
-
- final int taskManagerPort = NetUtils.getAvailablePort();
-
- // start the task manager process
- String[] command = new String[] {
- javaCommand,
- "-Dlog.level=DEBUG",
- "-Dlog4j.configuration=file:" +
tempLogFile.getAbsolutePath(),
- "-Xms256m", "-Xmx256m",
- "-classpath", getCurrentClasspath(),
-
TaskManagerTestEntryPoint.class.getName(),
- String.valueOf(jobManagerPort),
String.valueOf(taskManagerPort)
- };
-
- ProcessBuilder bld = new ProcessBuilder(command);
- taskManagerProcess = bld.start();
- new PipeForwarder(taskManagerProcess.getErrorStream(),
processOutput);
-
- // grab the reference to the TaskManager. try multiple
times, until the process
- // is started and the TaskManager is up
- String taskManagerActorName =
String.format("akka.tcp://flink@%s/user/%s",
-
org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost,
taskManagerPort),
- TaskManager.TASK_MANAGER_NAME());
-
- ActorRef taskManagerRef = null;
- Throwable lastError = null;
- for (int i = 0; i < 40; i++) {
- try {
- taskManagerRef =
TaskManager.getTaskManagerRemoteReference(
- taskManagerActorName,
jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS));
- break;
- }
- catch (Throwable t) {
- // TaskManager probably not ready yet
- lastError = t;
- }
- Thread.sleep(500);
- }
+public class TaskManagerProcessReapingTest extends
TaskManagerProcessReapingTestBase {
- assertTrue("TaskManager process died",
isProcessAlive(taskManagerProcess));
-
- if (taskManagerRef == null) {
- if (lastError != null) {
- lastError.printStackTrace();
- }
- fail("TaskManager process did not launch the
TaskManager properly. Failed to look up "
- + taskManagerActorName);
- }
-
- // kill the TaskManager actor
- taskManagerRef.tell(PoisonPill.getInstance(),
ActorRef.noSender());
-
- // wait for max 5 seconds for the process to terminate
- {
- long now = System.currentTimeMillis();
- long deadline = now + 10000;
-
- while (now < deadline &&
isProcessAlive(taskManagerProcess)) {
- Thread.sleep(100);
- now = System.currentTimeMillis();
- }
- }
-
- assertFalse("TaskManager process did not terminate upon
actor death", isProcessAlive(taskManagerProcess));
-
- int returnCode = taskManagerProcess.exitValue();
- assertEquals("TaskManager died, but not because of the
process reaper",
-
TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
- }
- catch (Exception e) {
- e.printStackTrace();
- printProcessLog(processOutput.toString());
- fail(e.getMessage());
- }
- catch (Error e) {
- e.printStackTrace();
- printProcessLog(processOutput.toString());
- throw e;
- }
- finally {
- if (taskManagerProcess != null) {
- taskManagerProcess.destroy();
- }
- if (jmActorSystem != null) {
- jmActorSystem.shutdown();
- }
- }
+ @Override
+ void onTaskManagerProcessRunning(ActorRef taskManager) {
+ // kill the TaskManager actor
+ taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- private static void printProcessLog(String log) {
- System.out.println("-----------------------------------------");
- System.out.println(" BEGIN SPAWNED PROCESS LOG");
- System.out.println("-----------------------------------------");
- System.out.println(log);
- System.out.println("-----------------------------------------");
- System.out.println(" END SPAWNED PROCESS LOG");
- System.out.println("-----------------------------------------");
- }
-
- //
--------------------------------------------------------------------------------------------
-
- public static class TaskManagerTestEntryPoint {
-
- public static void main(String[] args) {
- try {
- int jobManagerPort = Integer.parseInt(args[0]);
- int taskManagerPort = Integer.parseInt(args[1]);
-
- Configuration cfg = new Configuration();
-
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
-
- TaskManager.runTaskManager("localhost",
ResourceID.generate(), taskManagerPort, cfg);
-
- // wait forever
- Object lock = new Object();
- synchronized (lock) {
- lock.wait();
- }
- }
- catch (Throwable t) {
- System.exit(1);
- }
- }
- }
-
- private static class PipeForwarder extends Thread {
-
- private final StringWriter target;
- private final InputStream source;
-
- public PipeForwarder(InputStream source, StringWriter target) {
- super("Pipe Forwarder");
- setDaemon(true);
-
- this.source = source;
- this.target = target;
-
- start();
- }
-
- @Override
- public void run() {
- try {
- int next;
- while ((next = source.read()) != -1) {
- target.write(next);
- }
- }
- catch (IOException e) {
- // terminate
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
new file mode 100644
index 0000000..c7913f7
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+import org.junit.Test;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that the TaskManager process properly exits when the TaskManager
actor dies.
+ */
+public abstract class TaskManagerProcessReapingTestBase {
+
+ /**
+ * Called after the task manager has been started up. After calling this
+ * method, the test base checks that the process exits.
+ */
+ abstract void onTaskManagerProcessRunning(ActorRef taskManager);
+
+ /**
+ * Called after the task manager has successfully terminated.
+ */
+ void onTaskManagerProcessTerminated(String processOutput) {
+ // Default does nothing
+ }
+
+ @Test
+ public void testReapProcessOnFailure() {
+ Process taskManagerProcess = null;
+ ActorSystem jmActorSystem = null;
+
+ final StringWriter processOutput = new StringWriter();
+
+ try {
+ String javaCommand = getJavaCommandPath();
+
+ // check that we run this test only if the java command
+ // is available on this machine
+ if (javaCommand == null) {
+ System.out.println("---- Skipping
TaskManagerProcessReapingTest : Could not find java executable ----");
+ return;
+ }
+
+ // create a logging file for the process
+ File tempLogFile = File.createTempFile("testlogconfig",
"properties");
+ tempLogFile.deleteOnExit();
+ CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+ final InetAddress localhost =
InetAddress.getByName("localhost");
+ final int jobManagerPort = NetUtils.getAvailablePort();
+
+ // start a JobManager
+ Tuple2<String, Object> localAddress = new
Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
+ jmActorSystem = AkkaUtils.createActorSystem(
+ new Configuration(), new
Some<Tuple2<String, Object>>(localAddress));
+
+ ActorRef jmActor = JobManager.startJobManagerActors(
+ new Configuration(),
+ jmActorSystem,
+ JobManager.class,
+ MemoryArchivist.class)._1;
+
+ // start a ResourceManager
+ StandaloneLeaderRetrievalService
standaloneLeaderRetrievalService =
+ new
StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
+
+ FlinkResourceManager.startResourceManagerActors(
+ new Configuration(),
+ jmActorSystem,
+ standaloneLeaderRetrievalService,
+ StandaloneResourceManager.class);
+
+ final int taskManagerPort = NetUtils.getAvailablePort();
+
+ // start the task manager process
+ String[] command = new String[] {
+ javaCommand,
+ "-Dlog.level=DEBUG",
+ "-Dlog4j.configuration=file:" +
tempLogFile.getAbsolutePath(),
+ "-Xms256m", "-Xmx256m",
+ "-classpath", getCurrentClasspath(),
+
TaskManagerTestEntryPoint.class.getName(),
+ String.valueOf(jobManagerPort),
String.valueOf(taskManagerPort)
+ };
+
+ ProcessBuilder bld = new ProcessBuilder(command);
+ taskManagerProcess = bld.start();
+ new PipeForwarder(taskManagerProcess.getErrorStream(),
processOutput);
+
+ // grab the reference to the TaskManager. try multiple
times, until the process
+ // is started and the TaskManager is up
+ String taskManagerActorName =
String.format("akka.tcp://flink@%s/user/%s",
+
org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost,
taskManagerPort),
+ TaskManager.TASK_MANAGER_NAME());
+
+ ActorRef taskManagerRef = null;
+ Throwable lastError = null;
+ for (int i = 0; i < 40; i++) {
+ try {
+ taskManagerRef =
TaskManager.getTaskManagerRemoteReference(
+ taskManagerActorName,
jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS));
+ break;
+ }
+ catch (Throwable t) {
+ // TaskManager probably not ready yet
+ lastError = t;
+ }
+ Thread.sleep(500);
+ }
+
+ assertTrue("TaskManager process died",
isProcessAlive(taskManagerProcess));
+
+ if (taskManagerRef == null) {
+ if (lastError != null) {
+ lastError.printStackTrace();
+ }
+ fail("TaskManager process did not launch the
TaskManager properly. Failed to look up "
+ + taskManagerActorName);
+ }
+
+ // kill the TaskManager actor
+ onTaskManagerProcessRunning(taskManagerRef);
+
+ // wait for max 5 seconds for the process to terminate
+ {
+ long now = System.currentTimeMillis();
+ long deadline = now + 10000;
+
+ while (now < deadline &&
isProcessAlive(taskManagerProcess)) {
+ Thread.sleep(100);
+ now = System.currentTimeMillis();
+ }
+ }
+
+ assertFalse("TaskManager process did not terminate upon
actor death", isProcessAlive(taskManagerProcess));
+
+ int returnCode = taskManagerProcess.exitValue();
+ assertEquals("TaskManager died, but not because of the
process reaper",
+
TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
+
+
onTaskManagerProcessTerminated(processOutput.toString());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ printProcessLog(processOutput.toString());
+ fail(e.getMessage());
+ }
+ catch (Error e) {
+ e.printStackTrace();
+ printProcessLog(processOutput.toString());
+ throw e;
+ }
+ finally {
+ if (taskManagerProcess != null) {
+ taskManagerProcess.destroy();
+ }
+ if (jmActorSystem != null) {
+ jmActorSystem.shutdown();
+ }
+ }
+ }
+
+ private static void printProcessLog(String log) {
+ System.out.println("-----------------------------------------");
+ System.out.println(" BEGIN SPAWNED PROCESS LOG");
+ System.out.println("-----------------------------------------");
+ System.out.println(log);
+ System.out.println("-----------------------------------------");
+ System.out.println(" END SPAWNED PROCESS LOG");
+ System.out.println("-----------------------------------------");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ public static class TaskManagerTestEntryPoint {
+
+ public static void main(String[] args) {
+ try {
+ int jobManagerPort = Integer.parseInt(args[0]);
+ int taskManagerPort = Integer.parseInt(args[1]);
+
+ Configuration cfg = new Configuration();
+
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
+
+ TaskManager.runTaskManager("localhost",
ResourceID.generate(), taskManagerPort, cfg);
+
+ // wait forever
+ Object lock = new Object();
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+ catch (Throwable t) {
+ System.exit(1);
+ }
+ }
+ }
+
+ private static class PipeForwarder extends Thread {
+
+ private final StringWriter target;
+ private final InputStream source;
+
+ public PipeForwarder(InputStream source, StringWriter target) {
+ super("Pipe Forwarder");
+ setDaemon(true);
+
+ this.source = source;
+ this.target = target;
+
+ start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ int next;
+ while ((next = source.read()) != -1) {
+ target.write(next);
+ }
+ }
+ catch (IOException e) {
+ // terminate
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index fec9ef3..56ab9c6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,10 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -34,7 +35,6 @@ import
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -44,13 +44,14 @@ import
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
-
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
@@ -69,7 +70,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
@@ -86,11 +86,12 @@ import static org.mockito.Mockito.when;
* execution listener, which simply put the messages in a queue to be picked
* up by the test and validated.
*/
-public class TaskTest {
+public class TaskTest extends TestLogger {
private static OneShotLatch awaitLatch;
private static OneShotLatch triggerLatch;
-
+ private static OneShotLatch cancelLatch;
+
private ActorGateway taskManagerGateway;
private ActorGateway jobManagerGateway;
private ActorGateway listenerGateway;
@@ -110,6 +111,7 @@ public class TaskTest {
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+ cancelLatch = new OneShotLatch();
}
@After
@@ -557,6 +559,123 @@ public class TaskTest {
verify(inputGate,
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
+ /**
+ * Tests that interrupt happens via watch dog if canceller is stuck in
cancel.
+ * Task cancellation blocks the task canceller. Interrupt after cancel
via
+ * cancellation watch dog.
+ */
+ @Test
+ public void testWatchDogInterruptsTask() throws Exception {
+ Configuration config = new Configuration();
+
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+ Task task = createTask(InvokableBlockingInCancel.class, config);
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ // No fatal error
+ for (Object msg : taskManagerMessages) {
+ assertFalse("Unexpected FatalError message", msg
instanceof TaskManagerMessages.FatalError);
+ }
+ }
+
+ /**
+ * The invoke() method holds a lock (trigger awaitLatch after
acquisition)
+ * and cancel cannot complete because it also tries to acquire the same
lock.
+ * This is resolved by the watch dog, no fatal error.
+ */
+ @Test
+ public void testInterruptableSharedLockInInvokeAndCancel() throws
Exception {
+ Configuration config = new Configuration();
+
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+ Task task =
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ // No fatal error
+ for (Object msg : taskManagerMessages) {
+ assertFalse("Unexpected FatalError message", msg
instanceof TaskManagerMessages.FatalError);
+ }
+ }
+
+ /**
+ * The invoke() method blocks infinitely, but cancel() does not block.
Only
+ * resolved by a fatal error.
+ */
+ @Test
+ public void testFatalErrorAfterUninterruptibleInvoke() throws Exception
{
+ Configuration config = new Configuration();
+
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+ Task task =
createTask(InvokableUninterruptibleBlockingInvoke.class, config);
+
+ try {
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ task.cancelExecution();
+
+ for (int i = 0; i < 10; i++) {
+ Object msg = taskManagerMessages.poll(1,
TimeUnit.SECONDS);
+ if (msg instanceof
TaskManagerMessages.FatalError) {
+ return; // success
+ }
+ }
+
+ fail("Did not receive expected task manager message");
+ } finally {
+ // Interrupt again to clean up Thread
+ cancelLatch.trigger();
+ task.getExecutingThread().interrupt();
+ task.getExecutingThread().join();
+ }
+ }
+
+ /**
+ * Tests that the task configuration is respected and overwritten by
the execution config.
+ */
+ @Test
+ public void testTaskConfig() throws Exception {
+ long interval = 28218123;
+ long timeout = interval + 19292;
+
+ Configuration config = new Configuration();
+
config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, interval);
+
config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, timeout);
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setTaskCancellationInterval(interval + 1337);
+ executionConfig.setTaskCancellationTimeout(timeout - 1337);
+
+ Task task = createTask(InvokableBlockingInInvoke.class, config,
executionConfig);
+
+ assertEquals(interval, task.getTaskCancellationInterval());
+ assertEquals(timeout, task.getTaskCancellationTimeout());
+
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ assertEquals(executionConfig.getTaskCancellationInterval(),
task.getTaskCancellationInterval());
+ assertEquals(executionConfig.getTaskCancellationTimeout(),
task.getTaskCancellationTimeout());
+
+ task.getExecutingThread().interrupt();
+ task.getExecutingThread().join();
+ }
+
//
------------------------------------------------------------------------
private void setInputGate(Task task, SingleInputGate inputGate) {
@@ -589,13 +708,28 @@ public class TaskTest {
}
private Task createTask(Class<? extends AbstractInvokable> invokable) {
+ return createTask(invokable, new Configuration(), new
ExecutionConfig());
+ }
+
+ private Task createTask(Class<? extends AbstractInvokable> invokable,
Configuration config) {
+ return createTask(invokable, config, new ExecutionConfig());
+ }
+
+ private Task createTask(Class<? extends AbstractInvokable> invokable,
Configuration config, ExecutionConfig execConfig) {
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
- return createTask(invokable, libCache);
+ return createTask(invokable, libCache, config, execConfig);
}
private Task createTask(Class<? extends AbstractInvokable> invokable,
LibraryCacheManager
libCache) {
+ return createTask(invokable, libCache, new Configuration(), new
ExecutionConfig());
+ }
+
+ private Task createTask(Class<? extends AbstractInvokable> invokable,
+ LibraryCacheManager
libCache,
+ Configuration config,
+ ExecutionConfig
execConfig) {
ResultPartitionManager partitionManager =
mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
@@ -604,14 +738,23 @@ public class TaskTest {
when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
- return createTask(invokable, libCache, network);
+ return createTask(invokable, libCache, network, config,
execConfig);
}
private Task createTask(Class<? extends AbstractInvokable> invokable,
LibraryCacheManager
libCache,
NetworkEnvironment
networkEnvironment) {
+
+ return createTask(invokable, libCache, networkEnvironment, new
Configuration(), new ExecutionConfig());
+ }
+
+ private Task createTask(Class<? extends AbstractInvokable> invokable,
+ LibraryCacheManager
libCache,
+ NetworkEnvironment
networkEnvironment,
+ Configuration config,
+ ExecutionConfig
execConfig) {
- TaskDeploymentDescriptor tdd =
createTaskDeploymentDescriptor(invokable);
+ TaskDeploymentDescriptor tdd =
createTaskDeploymentDescriptor(invokable, config, execConfig);
return new Task(
tdd,
@@ -629,18 +772,26 @@ public class TaskTest {
}
private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<?
extends AbstractInvokable> invokable) {
- SerializedValue<ExecutionConfig> execConfig;
+ return createTaskDeploymentDescriptor(invokable, new
Configuration(), new ExecutionConfig());
+ }
+
+ private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+ Class<? extends AbstractInvokable> invokable,
+ Configuration taskConfig,
+ ExecutionConfig execConfig) {
+
+ SerializedValue<ExecutionConfig> serializedExecConfig;
try {
- execConfig = new SerializedValue<>(new
ExecutionConfig());
+ serializedExecConfig = new
SerializedValue<>(execConfig);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new TaskDeploymentDescriptor(
new JobID(), "Test Job", new JobVertexID(), new
ExecutionAttemptID(),
- execConfig,
+ serializedExecConfig,
"Test Task", 0, 1, 0,
- new Configuration(), new Configuration(),
+ new Configuration(), taskConfig,
invokable.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -842,4 +993,72 @@ public class TaskTest {
throw new CancelTaskException();
}
}
+
+ public static final class
InvokableInterruptableSharedLockInInvokeAndCancel extends AbstractInvokable {
+
+ private final Object lock = new Object();
+
+ @Override
+ public void invoke() throws Exception {
+ synchronized (lock) {
+ awaitLatch.trigger();
+ wait();
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ synchronized (lock) {
+ cancelLatch.trigger();
+ }
+ }
+ }
+
+ public static final class InvokableBlockingInCancel extends
AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ awaitLatch.trigger();
+
+ try {
+ cancelLatch.await();
+ synchronized (this) {
+ wait();
+ }
+ } catch (InterruptedException ignored) {
+ synchronized (this) {
+ notifyAll(); // notify all that are
stuck in cancel
+ }
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+
+ synchronized (this) {
+ cancelLatch.trigger();
+ wait();
+ }
+ }
+ }
+
+ public static final class InvokableUninterruptibleBlockingInvoke
extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ while (!cancelLatch.isTriggered()) {
+ try {
+ synchronized (this) {
+ awaitLatch.trigger();
+ wait();
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index d802860..9ad2c30 100644
---
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -52,4 +52,13 @@ public final class OneShotLatch {
}
}
}
+
+ /**
+ * Checks if the latch was triggered.
+ *
+ * @return True, if the latch was triggered, false if not.
+ */
+ public boolean isTriggered() {
+ return triggered;
+ }
}