Repository: flink Updated Branches: refs/heads/release-1.0 a405b55b0 -> 3d8c7c1ea
[FLINK-3594] [runtime] Make sure exceptions during checkpoints are handled properly - For the asynchronous trigger, exceptions are suppressed if the task is no longer running - The task cannot go to "not running" while a checkpoint is still in progress. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f905503f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f905503f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f905503f Branch: refs/heads/release-1.0 Commit: f905503f4c4028e444c0f8e28a2c12b3651b556a Parents: a405b55 Author: Stephan Ewen <[email protected]> Authored: Thu Mar 10 21:30:49 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Mar 11 15:53:18 2016 +0100 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 36 +++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f905503f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 7cd4cf3..7138d53 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -207,6 +207,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // first order of business is to give operators back their state restoreState(); + lazyRestoreState = null; // GC friendliness // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened @@ -219,23 +220,25 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> throw new CancelTaskException(); } - // let the task do its work + // let the task do its work isRunning = true; run(); - isRunning = false; - - if (LOG.isDebugEnabled()) { - LOG.debug("Finished task {}", getName()); - } + + LOG.debug("Finished task {}", getName()); // make sure no further checkpoint and notification actions happen. // we make sure that no other thread is currently in the locked scope before // we close the operators by trying to acquire the checkpoint scope lock // we also need to make sure that no triggers fire concurrently with the close logic + // at the same time, this makes sure that during any "regular" exit where still synchronized (lock) { + isRunning = false; + // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); } + + LOG.debug("Closed operators for task {}", getName()); // make sure all buffered data is flushed operatorChain.flushOutputs(); @@ -457,7 +460,21 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } @Override - public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception { + public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception { + try { + return performCheckpoint(checkpointId, timestamp); + } + catch (Exception e) { + // propagate exceptions only if the task is still in "running" state + if (isRunning) { + throw e; + } else { + return false; + } + } + } + + protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception { LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); synchronized (lock) { @@ -675,7 +692,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> @Override public void onEvent(CheckpointBarrier barrier) { try { - triggerCheckpoint(barrier.getId(), barrier.getTimestamp()); + performCheckpoint(barrier.getId(), barrier.getTimestamp()); + } + catch (CancelTaskException e) { + throw e; } catch (Exception e) { throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
