Repository: flink Updated Branches: refs/heads/master 891950eab -> 1a578657d
[FLINK-5028] [streaming] StreamTask skips clean shutdown logic upon cancellation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a578657 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a578657 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a578657 Branch: refs/heads/master Commit: 1a578657d078dfb2d26a6f6e60876271d6f4c2ff Parents: 891950e Author: Stephan Ewen <[email protected]> Authored: Mon Nov 7 15:47:03 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Nov 7 17:04:18 2016 +0100 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 8 +- .../streaming/runtime/tasks/StreamTaskTest.java | 132 ++++++++++++++++--- 2 files changed, 124 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1a578657/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index fa7d1b0..c75458e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -240,7 +240,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // task specific initialization init(); - // save the work of reloadig state, etc, if the task is already canceled + // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } @@ -266,6 +266,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> isRunning = true; run(); + // if this left the run() method cleanly despite the fact that this was canceled, + // make sure the "clean shutdown" is not attempted + if (canceled) { + throw new CancelTaskException(); + } + // make sure all timers finish and no new timers can come timerService.quiesceAndAwaitPending(); http://git-wip-us.apache.org/repos/asf/flink/blob/1a578657/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 94f6d5a..603bdd2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -56,6 +57,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -89,6 +91,8 @@ import static org.mockito.Mockito.when; public class StreamTaskTest { + private static OneShotLatch SYNC_LATCH; + /** * This test checks that cancel calls that are issued before the operator is * instantiated still lead to proper canceling. @@ -154,6 +158,25 @@ public class StreamTaskTest { assertEquals(ExecutionState.FINISHED, task.getExecutionState()); } + @Test + public void testCancellationNotBlockedOnLock() throws Exception { + SYNC_LATCH = new OneShotLatch(); + + StreamConfig cfg = new StreamConfig(new Configuration()); + Task task = createTask(CancelLockingTask.class, cfg, new Configuration()); + + // start the task and wait until it runs + // execution state RUNNING is not enough, we need to wait until the stream task's run() method + // is entered + task.startTaskThread(); + SYNC_LATCH.await(); + + // cancel the execution - this should lead to smooth shutdown + task.cancelExecution(); + task.getExecutingThread().join(); + + assertEquals(ExecutionState.CANCELED, task.getExecutionState()); + } // ------------------------------------------------------------------------ // Test Utilities @@ -233,21 +256,21 @@ public class StreamTaskTest { 0); return new Task( - tdd, - mock(MemoryManager.class), - mock(IOManager.class), - network, - mock(BroadcastVariableManager.class), - mock(TaskManagerConnection.class), - mock(InputSplitProvider.class), - mock(CheckpointResponder.class), - libCache, - mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), - new UnregisteredTaskMetricsGroup(), - consumableNotifier, - partitionStateChecker, - executor); + tdd, + mock(MemoryManager.class), + mock(IOManager.class), + network, + mock(BroadcastVariableManager.class), + mock(TaskManagerConnection.class), + mock(InputSplitProvider.class), + mock(CheckpointResponder.class), + libCache, + mock(FileCache.class), + new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), + new UnregisteredTaskMetricsGroup(), + consumableNotifier, + partitionStateChecker, + executor); } // ------------------------------------------------------------------------ @@ -309,4 +332,83 @@ public class StreamTaskTest { return mock(AbstractStateBackend.class); } } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + + /** + * A task that locks if cancellation attempts to cleanly shut down + */ + public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> { + + private final OneShotLatch latch = new OneShotLatch(); + + private LockHolder holder; + + @Override + protected void init() {} + + @Override + protected void run() throws Exception { + holder = new LockHolder(getCheckpointLock(), latch); + holder.start(); + latch.await(); + + // we are at the point where cancelling can happen + SYNC_LATCH.trigger(); + + // just put this to sleep until it is interrupted + try { + Thread.sleep(100000000); + } catch (InterruptedException ignored) { + // restore interruption state + Thread.currentThread().interrupt(); + } + } + + @Override + protected void cleanup() { + holder.cancel(); + holder.interrupt(); + } + + @Override + protected void cancelTask() { + holder.cancel(); + // do not interrupt the lock holder here, to simulate spawned threads that + // we cannot properly interrupt on cancellation + } + + + private static final class LockHolder extends Thread { + + private final OneShotLatch trigger; + private final Object lock; + private volatile boolean canceled; + + private LockHolder(Object lock, OneShotLatch trigger) { + this.lock = lock; + this.trigger = trigger; + } + + @Override + public void run() { + synchronized (lock) { + while (!canceled) { + // signal that we grabbed the lock + trigger.trigger(); + + // basically freeze this thread + try { + Thread.sleep(1000000000); + } catch (InterruptedException ignored) {} + } + } + } + + public void cancel() { + canceled = true; + } + } + } }
