[FLINK-5278] Improve task and checkpoint related logging

Add more logging

This closes #2959.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea708071
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea708071
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea708071

Branch: refs/heads/master
Commit: ea7080712f2dcbdf125b806007c80aa3d120f30a
Parents: d3f19a5
Author: Till Rohrmann <[email protected]>
Authored: Wed Dec 7 16:22:23 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Dec 9 14:42:13 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  32 ++--
 .../runtime/checkpoint/CompletedCheckpoint.java |   3 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  75 ++++++++-
 .../flink/runtime/executiongraph/Execution.java |   9 +-
 .../apache/flink/runtime/taskmanager/Task.java  | 165 ++++++++++++-------
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  36 ++++
 .../streaming/runtime/tasks/StreamTask.java     |  25 ++-
 7 files changed, 260 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8ca4b2e..5f0fd74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -613,6 +613,7 @@ public class CheckpointCoordinator {
                if (shutdown || message == null) {
                        return false;
                }
+
                if (!job.equals(message.getJob())) {
                        LOG.error("Received AcknowledgeCheckpoint message for 
wrong job: {}", message);
                        return false;
@@ -641,6 +642,9 @@ public class CheckpointCoordinator {
 
                                switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), 
message.getSubtaskState())) {
                                        case SUCCESS:
+                                               LOG.debug("Received acknowledge 
message for checkpoint {} from task {} of job {}.",
+                                                       checkpointId, 
message.getTaskExecutionId(), message.getJob());
+
                                                if 
(checkpoint.isFullyAcknowledged()) {
 
                                                        // record the time when 
this was completed, to calculate
@@ -651,8 +655,8 @@ public class CheckpointCoordinator {
                                                        completed = 
checkpoint.finalizeCheckpoint();
                                                        
completedCheckpointStore.addCheckpoint(completed);
 
-                                                       LOG.info("Completed 
checkpoint " + checkpointId + " (in " +
-                                                               
completed.getDuration() + " ms)");
+                                                       LOG.info("Completed 
checkpoint {} ({} bytes in {} ms).", checkpointId,
+                                                               
completed.getStateSize(), completed.getDuration());
 
                                                        if 
(LOG.isDebugEnabled()) {
                                                                StringBuilder 
builder = new StringBuilder();
@@ -685,7 +689,7 @@ public class CheckpointCoordinator {
                                                                "the state 
handle to avoid lingering state.", message.getCheckpointId(),
                                                        
message.getTaskExecutionId(), message.getJob());
 
-                                               
discardState(message.getSubtaskState());
+                                               discardState(message.getJob(), 
message.getTaskExecutionId(), message.getCheckpointId(), 
message.getSubtaskState());
 
                                                break;
                                        case DISCARDED:
@@ -694,7 +698,7 @@ public class CheckpointCoordinator {
                                                                "state handle 
tp avoid lingering state.",
                                                        
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
 
-                                               
discardState(message.getSubtaskState());
+                                               discardState(message.getJob(), 
message.getTaskExecutionId(), message.getCheckpointId(), 
message.getSubtaskState());
                                }
                        }
                        else if (checkpoint != null) {
@@ -706,15 +710,17 @@ public class CheckpointCoordinator {
                                // message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
                                if 
(recentPendingCheckpoints.contains(checkpointId)) {
                                        isPendingCheckpoint = true;
-                                       LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
+                                       LOG.warn("Received late message for now 
expired checkpoint attempt {} from " +
+                                               "{} of job {}.", checkpointId, 
message.getTaskExecutionId(), message.getJob());
                                }
                                else {
-                                       LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
+                                       LOG.debug("Received message for an 
unknown checkpoint {} from {} of job {}.",
+                                               checkpointId, 
message.getTaskExecutionId(), message.getJob());
                                        isPendingCheckpoint = false;
                                }
 
                                // try to discard the state so that we don't 
have lingering state lying around
-                               discardState(message.getSubtaskState());
+                               discardState(message.getJob(), 
message.getTaskExecutionId(), message.getCheckpointId(), 
message.getSubtaskState());
                        }
                }
 
@@ -947,19 +953,25 @@ public class CheckpointCoordinator {
                                triggerCheckpoint(System.currentTimeMillis(), 
true);
                        }
                        catch (Exception e) {
-                               LOG.error("Exception while triggering 
checkpoint", e);
+                               LOG.error("Exception while triggering 
checkpoint.", e);
                        }
                }
        }
 
-       private void discardState(final StateObject stateObject) {
+       private void discardState(
+               final JobID jobId,
+               final ExecutionAttemptID executionAttemptID,
+               final long checkpointId,
+               final StateObject stateObject) {
                executor.execute(new Runnable() {
                        @Override
                        public void run() {
                                try {
                                        stateObject.discardState();
                                } catch (Exception e) {
-                                       LOG.warn("Could not properly discard 
state object.", e);
+                                       LOG.warn("Could not properly discard 
state object of checkpoint {} " +
+                                               "belonging to task {} of job 
{}.", checkpointId, executionAttemptID, jobId,
+                                               e);
                                }
                        }
                });

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 3c33ce3..ed65011 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.StateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
@@ -164,7 +163,7 @@ public class CompletedCheckpoint implements Serializable {
                }
        }
 
-       public long getStateSize() throws IOException {
+       public long getStateSize() {
                long result = 0L;
 
                for (TaskState taskState : taskStates.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 4add504..fdd0d40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -163,7 +163,17 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String> latest = initialCheckpoints
                                        .get(numberOfInitialCheckpoints - 1);
 
-                       CompletedCheckpoint latestCheckpoint = 
latest.f0.retrieveState();
+                       CompletedCheckpoint latestCheckpoint;
+                       long checkpointId = pathToCheckpointId(latest.f1);
+
+                       LOG.info("Trying to retrieve checkpoint {}.", 
checkpointId);
+
+                       try {
+                               latestCheckpoint = latest.f0.retrieveState();
+                       } catch (Exception e) {
+                               throw new Exception("Could not retrieve the 
completed checkpoint " + checkpointId +
+                               " from the state storage.", e);
+                       }
 
                        checkpointStateHandles.add(latest);
 
@@ -190,7 +200,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                checkNotNull(checkpoint, "Checkpoint");
 
                // First add the new one. If it fails, we don't want to loose 
existing data.
-               String path = String.format("/%s", 
checkpoint.getCheckpointID());
+               String path = checkpointIdToPath(checkpoint.getCheckpointID());
 
                final RetrievableStateHandle<CompletedCheckpoint> stateHandle =
                                checkpointsInZooKeeper.add(path, checkpoint);
@@ -298,14 +308,36 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                BackgroundCallback callback = new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework client, 
CuratorEvent event) throws Exception {
+                               final long checkpointId = 
pathToCheckpointId(stateHandleAndPath.f1);
+
                                try {
                                        if (event.getType() == 
CuratorEventType.DELETE) {
                                                if (event.getResultCode() == 0) 
{
+                                                       Exception exception = 
null;
+
                                                        try {
                                                                action.call();
-                                                       } finally {
+                                                       } catch (Exception e) {
+                                                               exception = new 
Exception("Could not execute callable action " +
+                                                                       "for 
checkpoint " + checkpointId + '.', e);
+                                                       }
+
+                                                       try {
                                                                // Discard the 
state handle
                                                                
stateHandleAndPath.f0.discardState();
+                                                       } catch (Exception e) {
+                                                               Exception 
newException = new Exception("Could not discard meta " +
+                                                                       "data 
for completed checkpoint " + checkpointId + '.', e);
+
+                                                               if (exception 
== null) {
+                                                                       
exception = newException;
+                                                               } else {
+                                                                       
exception.addSuppressed(newException);
+                                                               }
+                                                       }
+
+                                                       if (exception != null) {
+                                                               throw exception;
                                                        }
                                                } else {
                                                        throw new 
IllegalStateException("Unexpected result code " +
@@ -316,7 +348,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                                                                event.getType() 
+ " in '" + event + "' callback.");
                                        }
                                } catch (Exception e) {
-                                       LOG.error("Failed to discard 
checkpoint.", e);
+                                       LOG.warn("Failed to discard checkpoint 
{}.", checkpointId, e);
                                }
                        }
                };
@@ -326,4 +358,39 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                // inconsistent state.
                checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
        }
+
+       /**
+        * Convert a checkpoint id into a ZooKeeper path.
+        *
+        * @param checkpointId to convert to the path
+        * @return Path created from the given checkpoint id
+        */
+       protected static String checkpointIdToPath(long checkpointId) {
+               return String.format("/%s", checkpointId);
+       }
+
+       /**
+        * Converts a path to the checkpoint id.
+        *
+        * @param path in ZooKeeper
+        * @return Checkpoint id parsed from the path
+        */
+       protected static long pathToCheckpointId(String path) {
+               try {
+                       String numberString;
+
+                       // check if we have a leading slash
+                       if ('/' == path.charAt(0) ) {
+                               numberString = path.substring(1);
+                       } else {
+                               numberString = path;
+                       }
+                       return Long.parseLong(numberString);
+               } catch (NumberFormatException e) {
+                       LOG.warn("Could not parse checkpoint id from {}. This 
indicates that the " +
+                               "checkpoint id to path conversion has 
changed.", path);
+
+                       return -1L;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 219d71d..16aebce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -1015,14 +1015,17 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        private boolean transitionState(ExecutionState currentState, 
ExecutionState targetState, Throwable error) {
                // sanity check
                if (currentState.isTerminal()) {
-                       throw new IllegalStateException("Cannot leave terminal 
state " + currentState + " to transition to " + targetState + ".");
+                       throw new IllegalStateException("Cannot leave terminal 
state " + currentState + " to transition to " + targetState + '.');
                }
 
                if (STATE_UPDATER.compareAndSet(this, currentState, 
targetState)) {
                        markTimestamp(targetState);
 
-                       LOG.info(getVertex().getTaskNameWithSubtaskIndex() + " 
("  + getAttemptId() + ") switched from "
-                               + currentState + " to " + targetState);
+                       if (error == null) {
+                               LOG.info("{} ({}) switched from {} to {}.", 
getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, 
targetState);
+                       } else {
+                               LOG.info("{} ({}) switched from {} to {}.", 
getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, 
targetState, error);
+                       }
 
                        // make sure that the state transition completes 
normally.
                        // potential errors (in listeners may not affect the 
main logic)

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 14ef1bf..184c3b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -504,7 +504,7 @@ public class Task implements Runnable, TaskActions {
                while (true) {
                        ExecutionState current = this.executionState;
                        if (current == ExecutionState.CREATED) {
-                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
+                               if (transitionState(ExecutionState.CREATED, 
ExecutionState.DEPLOYING)) {
                                        // success, we can start our work
                                        break;
                                }
@@ -515,14 +515,14 @@ public class Task implements Runnable, TaskActions {
                                return;
                        }
                        else if (current == ExecutionState.CANCELING) {
-                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.CANCELING, ExecutionState.CANCELED)) {
+                               if (transitionState(ExecutionState.CANCELING, 
ExecutionState.CANCELED)) {
                                        // we were immediately canceled. tell 
the TaskManager that we reached our final state
                                        notifyFinalState();
                                        return;
                                }
                        }
                        else {
-                               throw new IllegalStateException("Invalid state 
for beginning of task operation");
+                               throw new IllegalStateException("Invalid state 
for beginning of operation of task " + this + '.');
                        }
                }
 
@@ -543,7 +543,7 @@ public class Task implements Runnable, TaskActions {
 
                        // first of all, get a user-code classloader
                        // this may involve downloading the job's JAR files 
and/or classes
-                       LOG.info("Loading JAR files for task " + 
taskNameWithSubtask);
+                       LOG.info("Loading JAR files for task {}.", this);
 
                        userCodeClassLoader = 
createUserCodeClassloader(libraryCache);
                        final ExecutionConfig executionConfig = 
serializedExecutionConfig.deserializeValue(userCodeClassLoader);
@@ -572,7 +572,7 @@ public class Task implements Runnable, TaskActions {
                        // the registration must also strictly be undone
                        // 
----------------------------------------------------------------
 
-                       LOG.info("Registering task at network: " + this);
+                       LOG.info("Registering task at network: {}.", this);
 
                        network.registerTask(this);
 
@@ -581,13 +581,15 @@ public class Task implements Runnable, TaskActions {
                                for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> entry :
                                                
DistributedCache.readFileInfoFromConfig(jobConfiguration))
                                {
-                                       LOG.info("Obtaining local cache file 
for '" + entry.getKey() + '\'');
+                                       LOG.info("Obtaining local cache file 
for '{}'.", entry.getKey());
                                        Future<Path> cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
                                        
distributedCacheEntries.put(entry.getKey(), cp);
                                }
                        }
                        catch (Exception e) {
-                               throw new Exception("Exception while adding 
files to distributed cache.", e);
+                               throw new Exception(
+                                       String.format("Exception while adding 
files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
+                                       e);
                        }
 
                        if (isCanceledOrFailed()) {
@@ -638,7 +640,7 @@ public class Task implements Runnable, TaskActions {
                        this.invokable = invokable;
 
                        // switch to the RUNNING state, if that fails, we have 
been canceled/failed in the meantime
-                       if (!STATE_UPDATER.compareAndSet(this, 
ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+                       if (!transitionState(ExecutionState.DEPLOYING, 
ExecutionState.RUNNING)) {
                                throw new CancelTaskException();
                        }
 
@@ -671,7 +673,7 @@ public class Task implements Runnable, TaskActions {
 
                        // try to mark the task as finished
                        // if that fails, the task was canceled/failed in the 
meantime
-                       if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.RUNNING, ExecutionState.FINISHED)) {
+                       if (transitionState(ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
                                notifyObservers(ExecutionState.FINISHED, null);
                        }
                        else {
@@ -694,7 +696,7 @@ public class Task implements Runnable, TaskActions {
 
                                        if (current == ExecutionState.RUNNING 
|| current == ExecutionState.DEPLOYING) {
                                                if (t instanceof 
CancelTaskException) {
-                                                       if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+                                                       if 
(transitionState(current, ExecutionState.CANCELED)) {
                                                                
cancelInvokable();
 
                                                                
notifyObservers(ExecutionState.CANCELED, null);
@@ -702,19 +704,19 @@ public class Task implements Runnable, TaskActions {
                                                        }
                                                }
                                                else {
-                                                       if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+                                                       if 
(transitionState(current, ExecutionState.FAILED, t)) {
                                                                // proper 
failure of the task. record the exception as the root cause
-                                                               LOG.error("Task 
execution failed. ", t);
+                                                               String 
errorMessage = String.format("Execution of {} ({}) failed.", 
taskNameWithSubtask, executionId);
                                                                failureCause = 
t;
                                                                
cancelInvokable();
 
-                                                               
notifyObservers(ExecutionState.FAILED, t);
+                                                               
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
                                                                break;
                                                        }
                                                }
                                        }
                                        else if (current == 
ExecutionState.CANCELING) {
-                                               if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+                                               if (transitionState(current, 
ExecutionState.CANCELED)) {
                                                        
notifyObservers(ExecutionState.CANCELED, null);
                                                        break;
                                                }
@@ -724,22 +726,22 @@ public class Task implements Runnable, TaskActions {
                                                break;
                                        }
                                        // unexpected state, go to failed
-                                       else if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-                                               LOG.error("Unexpected state in 
Task during an exception: " + current);
+                                       else if (transitionState(current, 
ExecutionState.FAILED, t)) {
+                                               LOG.error("Unexpected state in 
task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, 
current);
                                                break;
                                        }
                                        // else fall through the loop and
                                }
                        }
                        catch (Throwable tt) {
-                               String message = "FATAL - exception in task 
exception handler";
+                               String message = String.format("FATAL - 
exception in exception handler of task %s (%s).", taskNameWithSubtask, 
executionId);
                                LOG.error(message, tt);
                                notifyFatalError(message, tt);
                        }
                }
                finally {
                        try {
-                               LOG.info("Freeing task resources for " + 
taskNameWithSubtask);
+                               LOG.info("Freeing task resources for {} ({}).", 
taskNameWithSubtask, executionId);
 
                                // stop the async dispatcher.
                                // copy dispatcher reference to stack, against 
concurrent release
@@ -767,7 +769,7 @@ public class Task implements Runnable, TaskActions {
                        }
                        catch (Throwable t) {
                                // an error in the resource cleanup is fatal
-                               String message = "FATAL - exception in task 
resource cleanup";
+                               String message = String.format("FATAL - 
exception in resource cleanup of task %s (%s).", taskNameWithSubtask, 
executionId);
                                LOG.error(message, t);
                                notifyFatalError(message, t);
                        }
@@ -779,7 +781,7 @@ public class Task implements Runnable, TaskActions {
                                metrics.close();
                        }
                        catch (Throwable t) {
-                               LOG.error("Error during metrics 
de-registration", t);
+                               LOG.error("Error during metrics de-registration 
of task {} ({}).", taskNameWithSubtask, executionId, t);
                        }
                }
        }
@@ -845,6 +847,39 @@ public class Task implements Runnable, TaskActions {
                taskManagerConnection.notifyFatalError(message, cause);
        }
 
+       /**
+        * Try to transition the execution state from the current state to the 
new state.
+        *
+        * @param currentState of the execution
+        * @param newState of the execution
+        * @return true if the transition was successful, otherwise false
+        */
+       private boolean transitionState(ExecutionState currentState, 
ExecutionState newState) {
+               return transitionState(currentState, newState, null);
+       }
+
+       /**
+        * Try to transition the execution state from the current state to the 
new state.
+        *
+        * @param currentState of the execution
+        * @param newState of the execution
+        * @param cause of the transition change or null
+        * @return true if the transition was successful, otherwise false
+        */
+       private boolean transitionState(ExecutionState currentState, 
ExecutionState newState, Throwable cause) {
+               if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
+                       if (cause == null) {
+                               LOG.info("{} ({}) switched from {} to {}.", 
taskNameWithSubtask, executionId, currentState, newState);
+                       } else {
+                               LOG.info("{} ({}) switched from {} to {}.", 
taskNameWithSubtask, executionId, currentState, newState, cause);
+                       }
+
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
        // 
----------------------------------------------------------------------------------------------------------------
        //  Stopping / Canceling / Failing the task from the outside
        // 
----------------------------------------------------------------------------------------------------------------
@@ -859,22 +894,22 @@ public class Task implements Runnable, TaskActions {
         *             if the {@link AbstractInvokable} does not implement 
{@link StoppableTask}
         */
        public void stopExecution() throws UnsupportedOperationException {
-               LOG.info("Attempting to stop task " + taskNameWithSubtask);
-               if(this.invokable instanceof StoppableTask) {
+               LOG.info("Attempting to stop task {} ({}).", 
taskNameWithSubtask, executionId);
+               if (invokable instanceof StoppableTask) {
                        Runnable runnable = new Runnable() {
                                @Override
                                public void run() {
                                        try {
-                                               
((StoppableTask)Task.this.invokable).stop();
+                                               
((StoppableTask)invokable).stop();
                                        } catch(RuntimeException e) {
-                                               LOG.error("Stopping task " + 
taskNameWithSubtask + " failed.", e);
+                                               LOG.error("Stopping task {} 
({}) failed.", taskNameWithSubtask, executionId, e);
                                                
taskManagerConnection.failTask(executionId, e);
                                        }
                                }
                        };
-                       executeAsyncCallRunnable(runnable, "Stopping source 
task " + this.taskNameWithSubtask);
+                       executeAsyncCallRunnable(runnable, 
String.format("Stopping source task %s (%s).", taskNameWithSubtask, 
executionId));
                } else {
-                       throw new UnsupportedOperationException("Stopping not 
supported by this task.");
+                       throw new 
UnsupportedOperationException(String.format("Stopping not supported by task %s 
(%s).", taskNameWithSubtask, executionId));
                }
        }
 
@@ -887,7 +922,7 @@ public class Task implements Runnable, TaskActions {
         * <p>This method never blocks.</p>
         */
        public void cancelExecution() {
-               LOG.info("Attempting to cancel task " + taskNameWithSubtask);
+               LOG.info("Attempting to cancel task {} ({}).", 
taskNameWithSubtask, executionId);
                cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
        }
 
@@ -902,37 +937,52 @@ public class Task implements Runnable, TaskActions {
         */
        @Override
        public void failExternally(Throwable cause) {
-               LOG.info("Attempting to fail task externally " + 
taskNameWithSubtask);
+               LOG.info("Attempting to fail task externally {} ({}).", 
taskNameWithSubtask, executionId);
                cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
        }
 
        private void cancelOrFailAndCancelInvokable(ExecutionState targetState, 
Throwable cause) {
                while (true) {
-                       ExecutionState current = this.executionState;
+                       ExecutionState current = executionState;
 
                        // if the task is already canceled (or canceling) or 
finished or failed,
                        // then we need not do anything
                        if (current.isTerminal() || current == 
ExecutionState.CANCELING) {
-                               LOG.info("Task " + taskNameWithSubtask + " is 
already in state " + current);
+                               LOG.info("Task {} is already in state {}", 
taskNameWithSubtask, current);
                                return;
                        }
 
                        if (current == ExecutionState.DEPLOYING || current == 
ExecutionState.CREATED) {
-                               if (STATE_UPDATER.compareAndSet(this, current, 
targetState)) {
+                               if (transitionState(current, targetState, 
cause)) {
                                        // if we manage this state transition, 
then the invokable gets never called
                                        // we need not call cancel on it
                                        this.failureCause = cause;
-                                       notifyObservers(targetState, cause);
+                                       notifyObservers(
+                                               targetState,
+                                               new Exception(
+                                                       String.format(
+                                                               "Cancel or fail 
execution of %s (%s).",
+                                                               
taskNameWithSubtask,
+                                                               executionId),
+                                                       cause));
                                        return;
                                }
                        }
                        else if (current == ExecutionState.RUNNING) {
-                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.RUNNING, targetState)) {
+                               if (transitionState(ExecutionState.RUNNING, 
targetState, cause)) {
                                        // we are canceling / failing out of 
the running state
                                        // we need to cancel the invokable
                                        if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
                                                this.failureCause = cause;
-                                               notifyObservers(targetState, 
cause);
+                                               notifyObservers(
+                                                       targetState,
+                                                       new Exception(
+                                                               String.format(
+                                                                       "Cancel 
or fail execution of %s (%s).",
+                                                                       
taskNameWithSubtask,
+                                                                       
executionId),
+                                                               cause));
+
                                                LOG.info("Triggering 
cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
 
                                                // because the canceling may 
block on user code, we cancel from a separate thread
@@ -951,7 +1001,7 @@ public class Task implements Runnable, TaskActions {
                                                                
producedPartitions,
                                                                inputGates);
                                                Thread cancelThread = new 
Thread(executingThread.getThreadGroup(), canceler,
-                                                               "Canceler for " 
+ taskNameWithSubtask);
+                                                               
String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
                                                cancelThread.setDaemon(true);
                                                cancelThread.start();
                                        }
@@ -959,7 +1009,8 @@ public class Task implements Runnable, TaskActions {
                                }
                        }
                        else {
-                               throw new IllegalStateException("Unexpected 
task state: " + current);
+                               throw new 
IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
+                                       current, taskNameWithSubtask, 
executionId));
                        }
                }
        }
@@ -973,13 +1024,6 @@ public class Task implements Runnable, TaskActions {
        }
 
        private void notifyObservers(ExecutionState newState, Throwable error) {
-               if (error == null) {
-                       LOG.info(taskNameWithSubtask + " switched to " + 
newState);
-               }
-               else {
-                       LOG.info(taskNameWithSubtask + " switched to " + 
newState + " with exception.", error);
-               }
-
                TaskExecutionState stateUpdate = new TaskExecutionState(jobId, 
executionId, newState, error);
 
                for (TaskExecutionStateListener listener : 
taskExecutionStateListeners) {
@@ -1066,24 +1110,29 @@ public class Task implements Runnable, TaskActions {
                                                catch (Throwable t) {
                                                        if (getExecutionState() 
== ExecutionState.RUNNING) {
                                                                
failExternally(new Exception(
-                                                                       "Error 
while triggering checkpoint for " + taskName,
-                                                                       t));
+                                                                       "Error 
while triggering checkpoint " + checkpointID + " for " +
+                                                                               
taskNameWithSubtask, t));
+                                                       } else {
+                                                               
LOG.debug("Encountered error while triggering checkpoint {} for " +
+                                                                       "{} 
({}) while being not in state running.", checkpointID,
+                                                                       
taskNameWithSubtask, executionId, t);
                                                        }
                                                }
                                        }
                                };
-                               executeAsyncCallRunnable(runnable, "Checkpoint 
Trigger for " + taskName);
+                               executeAsyncCallRunnable(runnable, 
String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, 
executionId));
                        }
                        else {
                                checkpointResponder.declineCheckpoint(jobId, 
executionId, checkpointID,
                                                new 
CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+                               
+                               LOG.error("Task received a checkpoint request, 
but is not a checkpointing task - {} ({}).",
+                                               taskNameWithSubtask, 
executionId);
 
-                               LOG.error("Task received a checkpoint request, 
but is not a checkpointing task - "
-                                               + taskNameWithSubtask);
                        }
                }
                else {
-                       LOG.debug("Declining checkpoint request for non-running 
task");
+                       LOG.debug("Declining checkpoint request for non-running 
task {} ({}).", taskNameWithSubtask, executionId);
 
                        // send back a message that we did not do the checkpoint
                        checkpointResponder.declineCheckpoint(jobId, 
executionId, checkpointID,
@@ -1120,12 +1169,12 @@ public class Task implements Runnable, TaskActions {
                                executeAsyncCallRunnable(runnable, "Checkpoint 
Confirmation for " + taskName);
                        }
                        else {
-                               LOG.error("Task received a checkpoint commit 
notification, but is not a checkpoint committing task - "
-                                               + taskNameWithSubtask);
+                               LOG.error("Task received a checkpoint commit 
notification, but is not a checkpoint committing task - {}.",
+                                               taskNameWithSubtask);
                        }
                }
                else {
-                       LOG.debug("Ignoring checkpoint commit notification for 
non-running task.");
+                       LOG.debug("Ignoring checkpoint commit notification for 
non-running task {}.", taskNameWithSubtask);
                }
        }
 
@@ -1228,14 +1277,14 @@ public class Task implements Runnable, TaskActions {
                                invokable.cancel();
                        }
                        catch (Throwable t) {
-                               LOG.error("Error while canceling task " + 
taskNameWithSubtask, t);
+                               LOG.error("Error while canceling task {}.", 
taskNameWithSubtask, t);
                        }
                }
        }
 
        @Override
        public String toString() {
-               return taskNameWithSubtask + " [" + executionState + ']';
+               return String.format("%s (%s) [%s]", taskNameWithSubtask, 
executionId, executionState);
        }
 
        /**
@@ -1312,7 +1361,7 @@ public class Task implements Runnable, TaskActions {
                                try {
                                        invokable.cancel();
                                } catch (Throwable t) {
-                                       logger.error("Error while canceling the 
task", t);
+                                       logger.error("Error while canceling the 
task {}.", taskName, t);
                                }
 
                                // Early release of input and output buffer 
pools. We do this
@@ -1326,7 +1375,7 @@ public class Task implements Runnable, TaskActions {
                                        try {
                                                partition.destroyBufferPool();
                                        } catch (Throwable t) {
-                                               LOG.error("Failed to release 
result partition buffer pool.", t);
+                                               LOG.error("Failed to release 
result partition buffer pool for task {}.", taskName, t);
                                        }
                                }
 
@@ -1334,7 +1383,7 @@ public class Task implements Runnable, TaskActions {
                                        try {
                                                inputGate.releaseAllResources();
                                        } catch (Throwable t) {
-                                               LOG.error("Failed to release 
input gate.", t);
+                                               LOG.error("Failed to release 
input gate for task {}.", taskName, t);
                                        }
                                }
 
@@ -1352,7 +1401,7 @@ public class Task implements Runnable, TaskActions {
                                        watchDogThread.join();
                                }
                        } catch (Throwable t) {
-                               logger.error("Error in the task canceler", t);
+                               logger.error("Error in the task canceler for 
task {}.", taskName, t);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..6ee0141
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
+
+       @Test
+       public void testPathConversion() {
+               final long checkpointId = 42L;
+
+               final String path = 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId);
+
+               assertEquals(checkpointId, 
ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 54f6c10..88a29ab 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -211,7 +211,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                boolean disposed = false;
                try {
                        // -------- Initialize ---------
-                       LOG.debug("Initializing {}", getName());
+                       LOG.debug("Initializing {}.", getName());
 
                        asyncOperationsThreadPool = 
Executors.newCachedThreadPool();
 
@@ -528,8 +528,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                catch (Exception e) {
                        // propagate exceptions only if the task is still in 
"running" state
                        if (isRunning) {
-                               throw e;
+                               throw new Exception("Could not perform 
checkpoint " + checkpointMetaData.getCheckpointId() +
+                                       "for operator " + getName() + '.', e);
                        } else {
+                               LOG.debug("Could not perform checkpoint {} for 
operator {} while the " +
+                                       "invokable was not in state running.", 
checkpointMetaData.getCheckpointId(), getName(), e);
                                return false;
                        }
                }
@@ -541,10 +544,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        performCheckpoint(checkpointMetaData);
                }
                catch (CancelTaskException e) {
-                       throw e;
+                       throw new Exception("Operator " + getName() + " was 
cancelled while performing checkpoint " +
+                               checkpointMetaData.getCheckpointId() + '.');
                }
                catch (Exception e) {
-                       throw new Exception("Error while performing a 
checkpoint", e);
+                       throw new Exception("Could not perform checkpoint " + 
checkpointMetaData.getCheckpointId() + " for operator " +
+                               getName() + '.', e);
                }
        }
 
@@ -678,7 +683,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                if (stateBackend != null) {
                        // backend has been configured on the environment
-                       LOG.info("Using user-defined state backend: " + 
stateBackend);
+                       LOG.info("Using user-defined state backend: {}.", 
stateBackend);
                } else {
                        // see if we have a backend specified in the 
configuration
                        Configuration flinkConfig = 
getEnvironment().getTaskManagerInfo().getConfiguration();
@@ -697,8 +702,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                case "filesystem":
                                        FsStateBackend backend = new 
FsStateBackendFactory().createFromConfig(flinkConfig);
-                                       LOG.info("State backend is set to heap 
memory (checkpoints to filesystem \""
-                                                       + backend.getBasePath() 
+ "\")");
+                                       LOG.info("State backend is set to heap 
memory (checkpoints to filesystem \"{}\")",
+                                               backend.getBasePath());
                                        stateBackend = backend;
                                        break;
 
@@ -933,7 +938,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                }
                        } catch (Exception e) {
                                // registers the exception and tries to fail 
the whole task
-                               AsynchronousException asyncException = new 
AsynchronousException(e);
+                               AsynchronousException asyncException = new 
AsynchronousException(
+                                       new Exception(
+                                               "Could not materialize 
checkpoint " + checkpointMetaData.getCheckpointId() +
+                                                       " for operator " + 
owner.getName() + '.',
+                                               e));
                                owner.handleAsyncException("Failure in 
asynchronous checkpoint materialization", asyncException);
                        } finally {
                                owner.cancelables.unregisterClosable(this);

Reply via email to