Repository: flink
Updated Branches:
  refs/heads/release-1.1 2041ba02b -> 290f8a25f


[FLINK-5038] [streaming runtime] Make sure Canceleables are canceled even them 
"cancelTask" throws an exception


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/290f8a25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/290f8a25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/290f8a25

Branch: refs/heads/release-1.1
Commit: 290f8a25fc4127b9734f45e782391506207748bc
Parents: 32f7efc
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 9 13:09:37 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 9 20:24:10 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  17 ++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 136 +++++++++++++++----
 2 files changed, 125 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/290f8a25/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4f0839f..aaaead0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -357,8 +357,15 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        public final void cancel() throws Exception {
                isRunning = false;
                canceled = true;
-               cancelTask();
-               closeAllClosables();
+
+               // the "cancel task" call must come first, but the cancelables 
must be
+               // closed no matter what
+               try {
+                       cancelTask();
+               }
+               finally {
+                       closeAllClosables();
+               }
        }
 
        public final boolean isRunning() {
@@ -519,6 +526,12 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        public RecordWriterOutput<?>[] getStreamOutputs() {
                return operatorChain.getStreamOutputs();
        }
+       
+       // visible for testing!
+       Set<Closeable> getCancelables() {
+               return cancelables;
+       }
+
 
        // 
------------------------------------------------------------------------
        //  Checkpoint and Restore

http://git-wip-us.apache.org/repos/asf/flink/blob/290f8a25/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f6b350b..4bae710 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -55,18 +55,20 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.net.URL;
 import java.util.Collections;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -167,6 +169,27 @@ public class StreamTaskTest {
                assertEquals(ExecutionState.CANCELED, task.getExecutionState());
        }
 
+       @Test
+       public void testCancellationFailsWithBlockingLock() throws Exception {
+               SYNC_LATCH = new OneShotLatch();
+
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+               Task task = createTask(CancelFailingTask.class, cfg, new 
Configuration());
+
+               // start the task and wait until it runs
+               // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
+               // is entered
+               task.startTaskThread();
+               SYNC_LATCH.await();
+
+               // cancel the execution - this should lead to smooth shutdown
+               task.cancelExecution();
+               task.getExecutingThread().join();
+
+               assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+       }
+
        // 
------------------------------------------------------------------------
        //  Test Utilities
        // 
------------------------------------------------------------------------
@@ -339,7 +362,7 @@ public class StreamTaskTest {
                        // we are at the point where cancelling can happen
                        SYNC_LATCH.trigger();
 
-                       // just freeze this task until it is interrupted
+                       // just put this to sleep until it is interrupted
                        try {
                                Thread.sleep(100000000);
                        } catch (InterruptedException ignored) {
@@ -350,8 +373,7 @@ public class StreamTaskTest {
 
                @Override
                protected void cleanup() {
-                       holder.cancel();
-                       holder.interrupt();
+                       holder.close();
                }
 
                @Override
@@ -360,38 +382,100 @@ public class StreamTaskTest {
                        // do not interrupt the lock holder here, to simulate 
spawned threads that
                        // we cannot properly interrupt on cancellation
                }
+       }
 
+       /**
+        * A task that locks if cancellation attempts to cleanly shut down 
+        */
+       public static class CancelFailingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
 
-               private static final class LockHolder extends Thread {
+               @Override
+               protected void init() {}
 
-                       private final OneShotLatch trigger;
-                       private final Object lock;
-                       private volatile boolean canceled;
+               
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+               @Override
+               protected void run() throws Exception {
+                       final OneShotLatch latch = new OneShotLatch();
+                       final Object lock = new Object();
 
-                       private LockHolder(Object lock, OneShotLatch trigger) {
-                               this.lock = lock;
-                               this.trigger = trigger;
-                       }
+                       LockHolder holder = new LockHolder(lock, latch);
+                       holder.start();
+                       try {
+                               // cancellation should try and cancel this
+                               Set<Closeable> canceleables = getCancelables();
+                               synchronized (canceleables) {
+                                       canceleables.add(holder);
+                               }
+
+                               // wait till the lock holder has the lock
+                               latch.await();
 
-                       @Override
-                       public void run() {
+                               // we are at the point where cancelling can 
happen
+                               SYNC_LATCH.trigger();
+       
+                               // try to acquire the lock - this is not 
possible as long as the lock holder
+                               // thread lives
                                synchronized (lock) {
-                                       while (!canceled) {
-                                               // signal that we grabbed the 
lock
-                                               trigger.trigger();
-
-                                               // basically freeze this thread
-                                               try {
-                                                       //noinspection 
SleepWhileHoldingLock
-                                                       
Thread.sleep(1000000000);
-                                               } catch (InterruptedException 
ignored) {}
-                                       }
+                                       // nothing
                                }
                        }
+                       finally {
+                               holder.close();
+                       }
+
+               }
+
+               @Override
+               protected void cleanup() {}
+
+               @Override
+               protected void cancelTask() throws Exception {
+                       throw new Exception("test exception");
+               }
+
+       }
+
+       // 
------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
 
-                       public void cancel() {
-                               canceled = true;
+       /**
+        * A thread that holds a lock as long as it lives
+        */
+       private static final class LockHolder extends Thread implements 
Closeable {
+
+               private final OneShotLatch trigger;
+               private final Object lock;
+               private volatile boolean canceled;
+
+               private LockHolder(Object lock, OneShotLatch trigger) {
+                       this.lock = lock;
+                       this.trigger = trigger;
+               }
+
+               @Override
+               public void run() {
+                       synchronized (lock) {
+                               while (!canceled) {
+                                       // signal that we grabbed the lock
+                                       trigger.trigger();
+
+                                       // basically freeze this thread
+                                       try {
+                                               //noinspection 
SleepWhileHoldingLock
+                                               Thread.sleep(1000000000);
+                                       } catch (InterruptedException ignored) 
{}
+                               }
                        }
                }
+
+               public void cancel() {
+                       canceled = true;
+               }
+
+               @Override
+               public void close() {
+                       canceled = true;
+                       interrupt();
+               }
        }
 }

Reply via email to