Repository: flink Updated Branches: refs/heads/release-1.1 fe2c4ba6a -> 4dd3efea4
[FLINK-5028] [streaming] StreamTask skips clean shutdown logic upon cancellation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4dd3efea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4dd3efea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4dd3efea Branch: refs/heads/release-1.1 Commit: 4dd3efea41cf4395f0cff096cce1e74f3f12b68b Parents: fe2c4ba Author: Stephan Ewen <[email protected]> Authored: Mon Nov 7 15:47:03 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Nov 7 17:53:44 2016 +0100 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 8 +- .../streaming/runtime/tasks/StreamTaskTest.java | 104 +++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4dd3efea/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index d56c9bf..8f28cef 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -238,7 +238,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // task specific initialization init(); - // save the work of reloadig state, etc, if the task is already canceled + // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } @@ -265,6 +265,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> isRunning = true; run(); + // if this left the run() method cleanly despite the fact that this was canceled, + // make sure the "clean shutdown" is not attempted + if (canceled) { + throw new CancelTaskException(); + } + LOG.debug("Finished task {}", getName()); // make sure no further checkpoint and notification actions happen. http://git-wip-us.apache.org/repos/asf/flink/blob/4dd3efea/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 83eb4bb..f6b350b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -49,6 +50,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -79,6 +81,8 @@ import static org.junit.Assert.fail; public class StreamTaskTest { + private static OneShotLatch SYNC_LATCH; + /** * This test checks that cancel calls that are issued before the operator is * instantiated still lead to proper canceling. @@ -142,6 +146,26 @@ public class StreamTaskTest { assertEquals(ExecutionState.FINISHED, task.getExecutionState()); } + @Test + public void testCancellationNotBlockedOnLock() throws Exception { + SYNC_LATCH = new OneShotLatch(); + + StreamConfig cfg = new StreamConfig(new Configuration()); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + Task task = createTask(CancelLockingTask.class, cfg, new Configuration()); + + // start the task and wait until it runs + // execution state RUNNING is not enough, we need to wait until the stream task's run() method + // is entered + task.startTaskThread(); + SYNC_LATCH.await(); + + // cancel the execution - this should lead to smooth shutdown + task.cancelExecution(); + task.getExecutingThread().join(); + + assertEquals(ExecutionState.CANCELED, task.getExecutionState()); + } // ------------------------------------------------------------------------ // Test Utilities @@ -290,4 +314,84 @@ public class StreamTaskTest { return mock(AbstractStateBackend.class); } } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + + /** + * A task that locks if cancellation attempts to cleanly shut down + */ + public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> { + + private final OneShotLatch latch = new OneShotLatch(); + + private LockHolder holder; + + @Override + protected void init() {} + + @Override + protected void run() throws Exception { + holder = new LockHolder(getCheckpointLock(), latch); + holder.start(); + latch.await(); + + // we are at the point where cancelling can happen + SYNC_LATCH.trigger(); + + // just freeze this task until it is interrupted + try { + Thread.sleep(100000000); + } catch (InterruptedException ignored) { + // restore interruption state + Thread.currentThread().interrupt(); + } + } + + @Override + protected void cleanup() { + holder.cancel(); + holder.interrupt(); + } + + @Override + protected void cancelTask() { + holder.cancel(); + // do not interrupt the lock holder here, to simulate spawned threads that + // we cannot properly interrupt on cancellation + } + + + private static final class LockHolder extends Thread { + + private final OneShotLatch trigger; + private final Object lock; + private volatile boolean canceled; + + private LockHolder(Object lock, OneShotLatch trigger) { + this.lock = lock; + this.trigger = trigger; + } + + @Override + public void run() { + synchronized (lock) { + while (!canceled) { + // signal that we grabbed the lock + trigger.trigger(); + + // basically freeze this thread + try { + //noinspection SleepWhileHoldingLock + Thread.sleep(1000000000); + } catch (InterruptedException ignored) {} + } + } + } + + public void cancel() { + canceled = true; + } + } + } }
