Repository: flink
Updated Branches:
  refs/heads/master 891950eab -> 1a578657d


[FLINK-5028] [streaming] StreamTask skips clean shutdown logic upon cancellation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a578657
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a578657
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a578657

Branch: refs/heads/master
Commit: 1a578657d078dfb2d26a6f6e60876271d6f4c2ff
Parents: 891950e
Author: Stephan Ewen <[email protected]>
Authored: Mon Nov 7 15:47:03 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Nov 7 17:04:18 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |   8 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 132 ++++++++++++++++---
 2 files changed, 124 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a578657/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fa7d1b0..c75458e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -240,7 +240,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // task specific initialization
                        init();
 
-                       // save the work of reloadig state, etc, if the task is 
already canceled
+                       // save the work of reloading state, etc, if the task 
is already canceled
                        if (canceled) {
                                throw new CancelTaskException();
                        }
@@ -266,6 +266,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        isRunning = true;
                        run();
 
+                       // if this left the run() method cleanly despite the 
fact that this was canceled,
+                       // make sure the "clean shutdown" is not attempted
+                       if (canceled) {
+                               throw new CancelTaskException();
+                       }
+
                        // make sure all timers finish and no new timers can 
come
                        timerService.quiesceAndAwaitPending();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a578657/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 94f6d5a..603bdd2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -56,6 +57,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -89,6 +91,8 @@ import static org.mockito.Mockito.when;
 
 public class StreamTaskTest {
 
+       private static OneShotLatch SYNC_LATCH;
+
        /**
         * This test checks that cancel calls that are issued before the 
operator is
         * instantiated still lead to proper canceling.
@@ -154,6 +158,25 @@ public class StreamTaskTest {
                assertEquals(ExecutionState.FINISHED, task.getExecutionState());
        }
 
+       @Test
+       public void testCancellationNotBlockedOnLock() throws Exception {
+               SYNC_LATCH = new OneShotLatch();
+
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               Task task = createTask(CancelLockingTask.class, cfg, new 
Configuration());
+
+               // start the task and wait until it runs
+               // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
+               // is entered
+               task.startTaskThread();
+               SYNC_LATCH.await();
+
+               // cancel the execution - this should lead to smooth shutdown
+               task.cancelExecution();
+               task.getExecutingThread().join();
+
+               assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+       }
 
        // 
------------------------------------------------------------------------
        //  Test Utilities
@@ -233,21 +256,21 @@ public class StreamTaskTest {
                                0);
 
                return new Task(
-                       tdd,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       network,
-                       mock(BroadcastVariableManager.class),
-                       mock(TaskManagerConnection.class),
-                       mock(InputSplitProvider.class),
-                       mock(CheckpointResponder.class),
-                       libCache,
-                       mock(FileCache.class),
-                       new TaskManagerRuntimeInfo("localhost", 
taskManagerConfig, System.getProperty("java.io.tmpdir")),
-                       new UnregisteredTaskMetricsGroup(),
-                       consumableNotifier,
-                       partitionStateChecker,
-                       executor);
+                               tdd,
+                               mock(MemoryManager.class),
+                               mock(IOManager.class),
+                               network,
+                               mock(BroadcastVariableManager.class),
+                               mock(TaskManagerConnection.class),
+                               mock(InputSplitProvider.class),
+                               mock(CheckpointResponder.class),
+                               libCache,
+                               mock(FileCache.class),
+                               new TaskManagerRuntimeInfo("localhost", 
taskManagerConfig, System.getProperty("java.io.tmpdir")),
+                               new UnregisteredTaskMetricsGroup(),
+                               consumableNotifier,
+                               partitionStateChecker,
+                               executor);
        }
        
        // 
------------------------------------------------------------------------
@@ -309,4 +332,83 @@ public class StreamTaskTest {
                        return mock(AbstractStateBackend.class);
                }
        }
+
+       // 
------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A task that locks if cancellation attempts to cleanly shut down 
+        */
+       public static class CancelLockingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
+
+               private final OneShotLatch latch = new OneShotLatch();
+
+               private LockHolder holder;
+
+               @Override
+               protected void init() {}
+
+               @Override
+               protected void run() throws Exception {
+                       holder = new LockHolder(getCheckpointLock(), latch);
+                       holder.start();
+                       latch.await();
+
+                       // we are at the point where cancelling can happen
+                       SYNC_LATCH.trigger();
+
+                       // just put this to sleep until it is interrupted
+                       try {
+                               Thread.sleep(100000000);
+                       } catch (InterruptedException ignored) {
+                               // restore interruption state
+                               Thread.currentThread().interrupt();
+                       }
+               }
+
+               @Override
+               protected void cleanup() {
+                       holder.cancel();
+                       holder.interrupt();
+               }
+
+               @Override
+               protected void cancelTask() {
+                       holder.cancel();
+                       // do not interrupt the lock holder here, to simulate 
spawned threads that
+                       // we cannot properly interrupt on cancellation
+               }
+
+
+               private static final class LockHolder extends Thread {
+
+                       private final OneShotLatch trigger;
+                       private final Object lock;
+                       private volatile boolean canceled;
+
+                       private LockHolder(Object lock, OneShotLatch trigger) {
+                               this.lock = lock;
+                               this.trigger = trigger;
+                       }
+
+                       @Override
+                       public void run() {
+                               synchronized (lock) {
+                                       while (!canceled) {
+                                               // signal that we grabbed the 
lock
+                                               trigger.trigger();
+
+                                               // basically freeze this thread
+                                               try {
+                                                       
Thread.sleep(1000000000);
+                                               } catch (InterruptedException 
ignored) {}
+                                       }
+                               }
+                       }
+
+                       public void cancel() {
+                               canceled = true;
+                       }
+               }
+       }
 }

Reply via email to