Repository: flink Updated Branches: refs/heads/release-1.1 2041ba02b -> 290f8a25f
[FLINK-5038] [streaming runtime] Make sure Canceleables are canceled even them "cancelTask" throws an exception Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/290f8a25 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/290f8a25 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/290f8a25 Branch: refs/heads/release-1.1 Commit: 290f8a25fc4127b9734f45e782391506207748bc Parents: 32f7efc Author: Stephan Ewen <se...@apache.org> Authored: Wed Nov 9 13:09:37 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Nov 9 20:24:10 2016 +0100 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 17 ++- .../streaming/runtime/tasks/StreamTaskTest.java | 136 +++++++++++++++---- 2 files changed, 125 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/290f8a25/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 4f0839f..aaaead0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -357,8 +357,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> public final void cancel() throws Exception { isRunning = false; canceled = true; - cancelTask(); - closeAllClosables(); + + // the "cancel task" call must come first, but the cancelables must be + // closed no matter what + try { + cancelTask(); + } + finally { + closeAllClosables(); + } } public final boolean isRunning() { @@ -519,6 +526,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> public RecordWriterOutput<?>[] getStreamOutputs() { return operatorChain.getStreamOutputs(); } + + // visible for testing! + Set<Closeable> getCancelables() { + return cancelables; + } + // ------------------------------------------------------------------------ // Checkpoint and Restore http://git-wip-us.apache.org/repos/asf/flink/blob/290f8a25/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index f6b350b..4bae710 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -55,18 +55,20 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.ExceptionUtils; - import org.apache.flink.util.SerializedValue; + import org.junit.Test; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; import java.net.URL; import java.util.Collections; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -167,6 +169,27 @@ public class StreamTaskTest { assertEquals(ExecutionState.CANCELED, task.getExecutionState()); } + @Test + public void testCancellationFailsWithBlockingLock() throws Exception { + SYNC_LATCH = new OneShotLatch(); + + StreamConfig cfg = new StreamConfig(new Configuration()); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + Task task = createTask(CancelFailingTask.class, cfg, new Configuration()); + + // start the task and wait until it runs + // execution state RUNNING is not enough, we need to wait until the stream task's run() method + // is entered + task.startTaskThread(); + SYNC_LATCH.await(); + + // cancel the execution - this should lead to smooth shutdown + task.cancelExecution(); + task.getExecutingThread().join(); + + assertEquals(ExecutionState.CANCELED, task.getExecutionState()); + } + // ------------------------------------------------------------------------ // Test Utilities // ------------------------------------------------------------------------ @@ -339,7 +362,7 @@ public class StreamTaskTest { // we are at the point where cancelling can happen SYNC_LATCH.trigger(); - // just freeze this task until it is interrupted + // just put this to sleep until it is interrupted try { Thread.sleep(100000000); } catch (InterruptedException ignored) { @@ -350,8 +373,7 @@ public class StreamTaskTest { @Override protected void cleanup() { - holder.cancel(); - holder.interrupt(); + holder.close(); } @Override @@ -360,38 +382,100 @@ public class StreamTaskTest { // do not interrupt the lock holder here, to simulate spawned threads that // we cannot properly interrupt on cancellation } + } + /** + * A task that locks if cancellation attempts to cleanly shut down + */ + public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> { - private static final class LockHolder extends Thread { + @Override + protected void init() {} - private final OneShotLatch trigger; - private final Object lock; - private volatile boolean canceled; + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + @Override + protected void run() throws Exception { + final OneShotLatch latch = new OneShotLatch(); + final Object lock = new Object(); - private LockHolder(Object lock, OneShotLatch trigger) { - this.lock = lock; - this.trigger = trigger; - } + LockHolder holder = new LockHolder(lock, latch); + holder.start(); + try { + // cancellation should try and cancel this + Set<Closeable> canceleables = getCancelables(); + synchronized (canceleables) { + canceleables.add(holder); + } + + // wait till the lock holder has the lock + latch.await(); - @Override - public void run() { + // we are at the point where cancelling can happen + SYNC_LATCH.trigger(); + + // try to acquire the lock - this is not possible as long as the lock holder + // thread lives synchronized (lock) { - while (!canceled) { - // signal that we grabbed the lock - trigger.trigger(); - - // basically freeze this thread - try { - //noinspection SleepWhileHoldingLock - Thread.sleep(1000000000); - } catch (InterruptedException ignored) {} - } + // nothing } } + finally { + holder.close(); + } + + } + + @Override + protected void cleanup() {} + + @Override + protected void cancelTask() throws Exception { + throw new Exception("test exception"); + } + + } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ - public void cancel() { - canceled = true; + /** + * A thread that holds a lock as long as it lives + */ + private static final class LockHolder extends Thread implements Closeable { + + private final OneShotLatch trigger; + private final Object lock; + private volatile boolean canceled; + + private LockHolder(Object lock, OneShotLatch trigger) { + this.lock = lock; + this.trigger = trigger; + } + + @Override + public void run() { + synchronized (lock) { + while (!canceled) { + // signal that we grabbed the lock + trigger.trigger(); + + // basically freeze this thread + try { + //noinspection SleepWhileHoldingLock + Thread.sleep(1000000000); + } catch (InterruptedException ignored) {} + } } } + + public void cancel() { + canceled = true; + } + + @Override + public void close() { + canceled = true; + interrupt(); + } } }