Repository: flink
Updated Branches:
  refs/heads/release-1.1 fe2c4ba6a -> 4dd3efea4


[FLINK-5028] [streaming] StreamTask skips clean shutdown logic upon cancellation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4dd3efea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4dd3efea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4dd3efea

Branch: refs/heads/release-1.1
Commit: 4dd3efea41cf4395f0cff096cce1e74f3f12b68b
Parents: fe2c4ba
Author: Stephan Ewen <[email protected]>
Authored: Mon Nov 7 15:47:03 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Nov 7 17:53:44 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |   8 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 104 +++++++++++++++++++
 2 files changed, 111 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4dd3efea/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d56c9bf..8f28cef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -238,7 +238,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        // task specific initialization
                        init();
                        
-                       // save the work of reloadig state, etc, if the task is 
already canceled
+                       // save the work of reloading state, etc, if the task 
is already canceled
                        if (canceled) {
                                throw new CancelTaskException();
                        }
@@ -265,6 +265,12 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        isRunning = true;
                        run();
 
+                       // if this left the run() method cleanly despite the 
fact that this was canceled,
+                       // make sure the "clean shutdown" is not attempted
+                       if (canceled) {
+                               throw new CancelTaskException();
+                       }
+
                        LOG.debug("Finished task {}", getName());
                        
                        // make sure no further checkpoint and notification 
actions happen.

http://git-wip-us.apache.org/repos/asf/flink/blob/4dd3efea/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 83eb4bb..f6b350b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -49,6 +50,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -79,6 +81,8 @@ import static org.junit.Assert.fail;
 
 public class StreamTaskTest {
 
+       private static OneShotLatch SYNC_LATCH;
+
        /**
         * This test checks that cancel calls that are issued before the 
operator is
         * instantiated still lead to proper canceling.
@@ -142,6 +146,26 @@ public class StreamTaskTest {
                assertEquals(ExecutionState.FINISHED, task.getExecutionState());
        }
 
+       @Test
+       public void testCancellationNotBlockedOnLock() throws Exception {
+               SYNC_LATCH = new OneShotLatch();
+
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+               Task task = createTask(CancelLockingTask.class, cfg, new 
Configuration());
+
+               // start the task and wait until it runs
+               // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
+               // is entered
+               task.startTaskThread();
+               SYNC_LATCH.await();
+
+               // cancel the execution - this should lead to smooth shutdown
+               task.cancelExecution();
+               task.getExecutingThread().join();
+
+               assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+       }
 
        // 
------------------------------------------------------------------------
        //  Test Utilities
@@ -290,4 +314,84 @@ public class StreamTaskTest {
                        return mock(AbstractStateBackend.class);
                }
        }
+
+       // 
------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A task that locks if cancellation attempts to cleanly shut down 
+        */
+       public static class CancelLockingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
+
+               private final OneShotLatch latch = new OneShotLatch();
+
+               private LockHolder holder;
+
+               @Override
+               protected void init() {}
+
+               @Override
+               protected void run() throws Exception {
+                       holder = new LockHolder(getCheckpointLock(), latch);
+                       holder.start();
+                       latch.await();
+
+                       // we are at the point where cancelling can happen
+                       SYNC_LATCH.trigger();
+
+                       // just freeze this task until it is interrupted
+                       try {
+                               Thread.sleep(100000000);
+                       } catch (InterruptedException ignored) {
+                               // restore interruption state
+                               Thread.currentThread().interrupt();
+                       }
+               }
+
+               @Override
+               protected void cleanup() {
+                       holder.cancel();
+                       holder.interrupt();
+               }
+
+               @Override
+               protected void cancelTask() {
+                       holder.cancel();
+                       // do not interrupt the lock holder here, to simulate 
spawned threads that
+                       // we cannot properly interrupt on cancellation
+               }
+
+
+               private static final class LockHolder extends Thread {
+
+                       private final OneShotLatch trigger;
+                       private final Object lock;
+                       private volatile boolean canceled;
+
+                       private LockHolder(Object lock, OneShotLatch trigger) {
+                               this.lock = lock;
+                               this.trigger = trigger;
+                       }
+
+                       @Override
+                       public void run() {
+                               synchronized (lock) {
+                                       while (!canceled) {
+                                               // signal that we grabbed the 
lock
+                                               trigger.trigger();
+
+                                               // basically freeze this thread
+                                               try {
+                                                       //noinspection 
SleepWhileHoldingLock
+                                                       
Thread.sleep(1000000000);
+                                               } catch (InterruptedException 
ignored) {}
+                                       }
+                               }
+                       }
+
+                       public void cancel() {
+                               canceled = true;
+                       }
+               }
+       }
 }

Reply via email to