Repository: flink
Updated Branches:
  refs/heads/release-1.0 a405b55b0 -> 3d8c7c1ea


[FLINK-3594] [runtime] Make sure exceptions during checkpoints are handled 
properly

  - For the asynchronous trigger, exceptions are suppressed if the task is no 
longer running
  - The task cannot go to "not running" while a checkpoint is still in progress.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f905503f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f905503f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f905503f

Branch: refs/heads/release-1.0
Commit: f905503f4c4028e444c0f8e28a2c12b3651b556a
Parents: a405b55
Author: Stephan Ewen <[email protected]>
Authored: Thu Mar 10 21:30:49 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Mar 11 15:53:18 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 36 +++++++++++++++-----
 1 file changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f905503f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7cd4cf3..7138d53 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -207,6 +207,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
 
                        // first order of business is to give operators back 
their state
                        restoreState();
+                       lazyRestoreState = null; // GC friendliness
                        
                        // we need to make sure that any triggers scheduled in 
open() cannot be
                        // executed before all operators are opened
@@ -219,23 +220,25 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                throw new CancelTaskException();
                        }
 
-                               // let the task do its work
+                       // let the task do its work
                        isRunning = true;
                        run();
-                       isRunning = false;
-                       
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Finished task {}", getName());
-                       }
+
+                       LOG.debug("Finished task {}", getName());
                        
                        // make sure no further checkpoint and notification 
actions happen.
                        // we make sure that no other thread is currently in 
the locked scope before
                        // we close the operators by trying to acquire the 
checkpoint scope lock
                        // we also need to make sure that no triggers fire 
concurrently with the close logic
+                       // at the same time, this makes sure that during any 
"regular" exit where still
                        synchronized (lock) {
+                               isRunning = false;
+                               
                                // this is part of the main logic, so if this 
fails, the task is considered failed
                                closeAllOperators();
                        }
+
+                       LOG.debug("Closed operators for task {}", getName());
                        
                        // make sure all buffered data is flushed
                        operatorChain.flushOutputs();
@@ -457,7 +460,21 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        }
 
        @Override
-       public boolean triggerCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
+       public boolean triggerCheckpoint(long checkpointId, long timestamp) 
throws Exception {
+               try {
+                       return performCheckpoint(checkpointId, timestamp);
+               }
+               catch (Exception e) {
+                       // propagate exceptions only if the task is still in 
"running" state
+                       if (isRunning) {
+                               throw e;
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+       protected boolean performCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
                LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
                
                synchronized (lock) {
@@ -675,7 +692,10 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        @Override
                        public void onEvent(CheckpointBarrier barrier) {
                                try {
-                                       triggerCheckpoint(barrier.getId(), 
barrier.getTimestamp());
+                                       performCheckpoint(barrier.getId(), 
barrier.getTimestamp());
+                               }
+                               catch (CancelTaskException e) {
+                                       throw e;
                                }
                                catch (Exception e) {
                                        throw new RuntimeException("Error 
triggering a checkpoint as the result of receiving checkpoint barrier", e);

Reply via email to