[FLINK-5278] Improve task and checkpoint related logging Add more logging
This closes #2959. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea708071 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea708071 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea708071 Branch: refs/heads/master Commit: ea7080712f2dcbdf125b806007c80aa3d120f30a Parents: d3f19a5 Author: Till Rohrmann <[email protected]> Authored: Wed Dec 7 16:22:23 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Dec 9 14:42:13 2016 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 32 ++-- .../runtime/checkpoint/CompletedCheckpoint.java | 3 +- .../ZooKeeperCompletedCheckpointStore.java | 75 ++++++++- .../flink/runtime/executiongraph/Execution.java | 9 +- .../apache/flink/runtime/taskmanager/Task.java | 165 ++++++++++++------- .../ZooKeeperCompletedCheckpointStoreTest.java | 36 ++++ .../streaming/runtime/tasks/StreamTask.java | 25 ++- 7 files changed, 260 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 8ca4b2e..5f0fd74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -613,6 +613,7 @@ public class CheckpointCoordinator { if (shutdown || message == null) { return false; } + if (!job.equals(message.getJob())) { LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message); return false; @@ -641,6 +642,9 @@ public class CheckpointCoordinator { switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) { case SUCCESS: + LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.", + checkpointId, message.getTaskExecutionId(), message.getJob()); + if (checkpoint.isFullyAcknowledged()) { // record the time when this was completed, to calculate @@ -651,8 +655,8 @@ public class CheckpointCoordinator { completed = checkpoint.finalizeCheckpoint(); completedCheckpointStore.addCheckpoint(completed); - LOG.info("Completed checkpoint " + checkpointId + " (in " + - completed.getDuration() + " ms)"); + LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId, + completed.getStateSize(), completed.getDuration()); if (LOG.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); @@ -685,7 +689,7 @@ public class CheckpointCoordinator { "the state handle to avoid lingering state.", message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); - discardState(message.getSubtaskState()); + discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); break; case DISCARDED: @@ -694,7 +698,7 @@ public class CheckpointCoordinator { "state handle tp avoid lingering state.", message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); - discardState(message.getSubtaskState()); + discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); } } else if (checkpoint != null) { @@ -706,15 +710,17 @@ public class CheckpointCoordinator { // message is for an unknown checkpoint, or comes too late (checkpoint disposed) if (recentPendingCheckpoints.contains(checkpointId)) { isPendingCheckpoint = true; - LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId); + LOG.warn("Received late message for now expired checkpoint attempt {} from " + + "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); } else { - LOG.debug("Received message for an unknown checkpoint {}.", checkpointId); + LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.", + checkpointId, message.getTaskExecutionId(), message.getJob()); isPendingCheckpoint = false; } // try to discard the state so that we don't have lingering state lying around - discardState(message.getSubtaskState()); + discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); } } @@ -947,19 +953,25 @@ public class CheckpointCoordinator { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { - LOG.error("Exception while triggering checkpoint", e); + LOG.error("Exception while triggering checkpoint.", e); } } } - private void discardState(final StateObject stateObject) { + private void discardState( + final JobID jobId, + final ExecutionAttemptID executionAttemptID, + final long checkpointId, + final StateObject stateObject) { executor.execute(new Runnable() { @Override public void run() { try { stateObject.discardState(); } catch (Exception e) { - LOG.warn("Could not properly discard state object.", e); + LOG.warn("Could not properly discard state object of checkpoint {} " + + "belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId, + e); } } }); http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 3c33ce3..ed65011 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.StateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; import java.util.Map; import java.util.Objects; @@ -164,7 +163,7 @@ public class CompletedCheckpoint implements Serializable { } } - public long getStateSize() throws IOException { + public long getStateSize() { long result = 0L; for (TaskState taskState : taskStates.values()) { http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 4add504..fdd0d40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -163,7 +163,17 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints .get(numberOfInitialCheckpoints - 1); - CompletedCheckpoint latestCheckpoint = latest.f0.retrieveState(); + CompletedCheckpoint latestCheckpoint; + long checkpointId = pathToCheckpointId(latest.f1); + + LOG.info("Trying to retrieve checkpoint {}.", checkpointId); + + try { + latestCheckpoint = latest.f0.retrieveState(); + } catch (Exception e) { + throw new Exception("Could not retrieve the completed checkpoint " + checkpointId + + " from the state storage.", e); + } checkpointStateHandles.add(latest); @@ -190,7 +200,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto checkNotNull(checkpoint, "Checkpoint"); // First add the new one. If it fails, we don't want to loose existing data. - String path = String.format("/%s", checkpoint.getCheckpointID()); + String path = checkpointIdToPath(checkpoint.getCheckpointID()); final RetrievableStateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint); @@ -298,14 +308,36 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1); + try { if (event.getType() == CuratorEventType.DELETE) { if (event.getResultCode() == 0) { + Exception exception = null; + try { action.call(); - } finally { + } catch (Exception e) { + exception = new Exception("Could not execute callable action " + + "for checkpoint " + checkpointId + '.', e); + } + + try { // Discard the state handle stateHandleAndPath.f0.discardState(); + } catch (Exception e) { + Exception newException = new Exception("Could not discard meta " + + "data for completed checkpoint " + checkpointId + '.', e); + + if (exception == null) { + exception = newException; + } else { + exception.addSuppressed(newException); + } + } + + if (exception != null) { + throw exception; } } else { throw new IllegalStateException("Unexpected result code " + @@ -316,7 +348,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto event.getType() + " in '" + event + "' callback."); } } catch (Exception e) { - LOG.error("Failed to discard checkpoint.", e); + LOG.warn("Failed to discard checkpoint {}.", checkpointId, e); } } }; @@ -326,4 +358,39 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto // inconsistent state. checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback); } + + /** + * Convert a checkpoint id into a ZooKeeper path. + * + * @param checkpointId to convert to the path + * @return Path created from the given checkpoint id + */ + protected static String checkpointIdToPath(long checkpointId) { + return String.format("/%s", checkpointId); + } + + /** + * Converts a path to the checkpoint id. + * + * @param path in ZooKeeper + * @return Checkpoint id parsed from the path + */ + protected static long pathToCheckpointId(String path) { + try { + String numberString; + + // check if we have a leading slash + if ('/' == path.charAt(0) ) { + numberString = path.substring(1); + } else { + numberString = path; + } + return Long.parseLong(numberString); + } catch (NumberFormatException e) { + LOG.warn("Could not parse checkpoint id from {}. This indicates that the " + + "checkpoint id to path conversion has changed.", path); + + return -1L; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 219d71d..16aebce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -1015,14 +1015,17 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) { // sanity check if (currentState.isTerminal()) { - throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + "."); + throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + '.'); } if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) { markTimestamp(targetState); - LOG.info(getVertex().getTaskNameWithSubtaskIndex() + " (" + getAttemptId() + ") switched from " - + currentState + " to " + targetState); + if (error == null) { + LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState); + } else { + LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState, error); + } // make sure that the state transition completes normally. // potential errors (in listeners may not affect the main logic) http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 14ef1bf..184c3b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -504,7 +504,7 @@ public class Task implements Runnable, TaskActions { while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.CREATED) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) { + if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } @@ -515,14 +515,14 @@ public class Task implements Runnable, TaskActions { return; } else if (current == ExecutionState.CANCELING) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) { + if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); return; } } else { - throw new IllegalStateException("Invalid state for beginning of task operation"); + throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } } @@ -543,7 +543,7 @@ public class Task implements Runnable, TaskActions { // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes - LOG.info("Loading JAR files for task " + taskNameWithSubtask); + LOG.info("Loading JAR files for task {}.", this); userCodeClassLoader = createUserCodeClassloader(libraryCache); final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader); @@ -572,7 +572,7 @@ public class Task implements Runnable, TaskActions { // the registration must also strictly be undone // ---------------------------------------------------------------- - LOG.info("Registering task at network: " + this); + LOG.info("Registering task at network: {}.", this); network.registerTask(this); @@ -581,13 +581,15 @@ public class Task implements Runnable, TaskActions { for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : DistributedCache.readFileInfoFromConfig(jobConfiguration)) { - LOG.info("Obtaining local cache file for '" + entry.getKey() + '\''); + LOG.info("Obtaining local cache file for '{}'.", entry.getKey()); Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId); distributedCacheEntries.put(entry.getKey(), cp); } } catch (Exception e) { - throw new Exception("Exception while adding files to distributed cache.", e); + throw new Exception( + String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId), + e); } if (isCanceledOrFailed()) { @@ -638,7 +640,7 @@ public class Task implements Runnable, TaskActions { this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime - if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { + if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } @@ -671,7 +673,7 @@ public class Task implements Runnable, TaskActions { // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime - if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) { + if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { notifyObservers(ExecutionState.FINISHED, null); } else { @@ -694,7 +696,7 @@ public class Task implements Runnable, TaskActions { if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { if (t instanceof CancelTaskException) { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + if (transitionState(current, ExecutionState.CANCELED)) { cancelInvokable(); notifyObservers(ExecutionState.CANCELED, null); @@ -702,19 +704,19 @@ public class Task implements Runnable, TaskActions { } } else { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { + if (transitionState(current, ExecutionState.FAILED, t)) { // proper failure of the task. record the exception as the root cause - LOG.error("Task execution failed. ", t); + String errorMessage = String.format("Execution of {} ({}) failed.", taskNameWithSubtask, executionId); failureCause = t; cancelInvokable(); - notifyObservers(ExecutionState.FAILED, t); + notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t)); break; } } } else if (current == ExecutionState.CANCELING) { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + if (transitionState(current, ExecutionState.CANCELED)) { notifyObservers(ExecutionState.CANCELED, null); break; } @@ -724,22 +726,22 @@ public class Task implements Runnable, TaskActions { break; } // unexpected state, go to failed - else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { - LOG.error("Unexpected state in Task during an exception: " + current); + else if (transitionState(current, ExecutionState.FAILED, t)) { + LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current); break; } // else fall through the loop and } } catch (Throwable tt) { - String message = "FATAL - exception in task exception handler"; + String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, tt); notifyFatalError(message, tt); } } finally { try { - LOG.info("Freeing task resources for " + taskNameWithSubtask); + LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId); // stop the async dispatcher. // copy dispatcher reference to stack, against concurrent release @@ -767,7 +769,7 @@ public class Task implements Runnable, TaskActions { } catch (Throwable t) { // an error in the resource cleanup is fatal - String message = "FATAL - exception in task resource cleanup"; + String message = String.format("FATAL - exception in resource cleanup of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, t); notifyFatalError(message, t); } @@ -779,7 +781,7 @@ public class Task implements Runnable, TaskActions { metrics.close(); } catch (Throwable t) { - LOG.error("Error during metrics de-registration", t); + LOG.error("Error during metrics de-registration of task {} ({}).", taskNameWithSubtask, executionId, t); } } } @@ -845,6 +847,39 @@ public class Task implements Runnable, TaskActions { taskManagerConnection.notifyFatalError(message, cause); } + /** + * Try to transition the execution state from the current state to the new state. + * + * @param currentState of the execution + * @param newState of the execution + * @return true if the transition was successful, otherwise false + */ + private boolean transitionState(ExecutionState currentState, ExecutionState newState) { + return transitionState(currentState, newState, null); + } + + /** + * Try to transition the execution state from the current state to the new state. + * + * @param currentState of the execution + * @param newState of the execution + * @param cause of the transition change or null + * @return true if the transition was successful, otherwise false + */ + private boolean transitionState(ExecutionState currentState, ExecutionState newState, Throwable cause) { + if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { + if (cause == null) { + LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState); + } else { + LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState, cause); + } + + return true; + } else { + return false; + } + } + // ---------------------------------------------------------------------------------------------------------------- // Stopping / Canceling / Failing the task from the outside // ---------------------------------------------------------------------------------------------------------------- @@ -859,22 +894,22 @@ public class Task implements Runnable, TaskActions { * if the {@link AbstractInvokable} does not implement {@link StoppableTask} */ public void stopExecution() throws UnsupportedOperationException { - LOG.info("Attempting to stop task " + taskNameWithSubtask); - if(this.invokable instanceof StoppableTask) { + LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId); + if (invokable instanceof StoppableTask) { Runnable runnable = new Runnable() { @Override public void run() { try { - ((StoppableTask)Task.this.invokable).stop(); + ((StoppableTask)invokable).stop(); } catch(RuntimeException e) { - LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e); + LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e); taskManagerConnection.failTask(executionId, e); } } }; - executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask); + executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId)); } else { - throw new UnsupportedOperationException("Stopping not supported by this task."); + throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", taskNameWithSubtask, executionId)); } } @@ -887,7 +922,7 @@ public class Task implements Runnable, TaskActions { * <p>This method never blocks.</p> */ public void cancelExecution() { - LOG.info("Attempting to cancel task " + taskNameWithSubtask); + LOG.info("Attempting to cancel task {} ({}).", taskNameWithSubtask, executionId); cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null); } @@ -902,37 +937,52 @@ public class Task implements Runnable, TaskActions { */ @Override public void failExternally(Throwable cause) { - LOG.info("Attempting to fail task externally " + taskNameWithSubtask); + LOG.info("Attempting to fail task externally {} ({}).", taskNameWithSubtask, executionId); cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause); } private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) { while (true) { - ExecutionState current = this.executionState; + ExecutionState current = executionState; // if the task is already canceled (or canceling) or finished or failed, // then we need not do anything if (current.isTerminal() || current == ExecutionState.CANCELING) { - LOG.info("Task " + taskNameWithSubtask + " is already in state " + current); + LOG.info("Task {} is already in state {}", taskNameWithSubtask, current); return; } if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) { - if (STATE_UPDATER.compareAndSet(this, current, targetState)) { + if (transitionState(current, targetState, cause)) { // if we manage this state transition, then the invokable gets never called // we need not call cancel on it this.failureCause = cause; - notifyObservers(targetState, cause); + notifyObservers( + targetState, + new Exception( + String.format( + "Cancel or fail execution of %s (%s).", + taskNameWithSubtask, + executionId), + cause)); return; } } else if (current == ExecutionState.RUNNING) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) { + if (transitionState(ExecutionState.RUNNING, targetState, cause)) { // we are canceling / failing out of the running state // we need to cancel the invokable if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { this.failureCause = cause; - notifyObservers(targetState, cause); + notifyObservers( + targetState, + new Exception( + String.format( + "Cancel or fail execution of %s (%s).", + taskNameWithSubtask, + executionId), + cause)); + LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId); // because the canceling may block on user code, we cancel from a separate thread @@ -951,7 +1001,7 @@ public class Task implements Runnable, TaskActions { producedPartitions, inputGates); Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler, - "Canceler for " + taskNameWithSubtask); + String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId)); cancelThread.setDaemon(true); cancelThread.start(); } @@ -959,7 +1009,8 @@ public class Task implements Runnable, TaskActions { } } else { - throw new IllegalStateException("Unexpected task state: " + current); + throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).", + current, taskNameWithSubtask, executionId)); } } } @@ -973,13 +1024,6 @@ public class Task implements Runnable, TaskActions { } private void notifyObservers(ExecutionState newState, Throwable error) { - if (error == null) { - LOG.info(taskNameWithSubtask + " switched to " + newState); - } - else { - LOG.info(taskNameWithSubtask + " switched to " + newState + " with exception.", error); - } - TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error); for (TaskExecutionStateListener listener : taskExecutionStateListeners) { @@ -1066,24 +1110,29 @@ public class Task implements Runnable, TaskActions { catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new Exception( - "Error while triggering checkpoint for " + taskName, - t)); + "Error while triggering checkpoint " + checkpointID + " for " + + taskNameWithSubtask, t)); + } else { + LOG.debug("Encountered error while triggering checkpoint {} for " + + "{} ({}) while being not in state running.", checkpointID, + taskNameWithSubtask, executionId, t); } } } }; - executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName); + executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); } else { checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask)); + + LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).", + taskNameWithSubtask, executionId); - LOG.error("Task received a checkpoint request, but is not a checkpointing task - " - + taskNameWithSubtask); } } else { - LOG.debug("Declining checkpoint request for non-running task"); + LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); // send back a message that we did not do the checkpoint checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, @@ -1120,12 +1169,12 @@ public class Task implements Runnable, TaskActions { executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName); } else { - LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - " - + taskNameWithSubtask); + LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.", + taskNameWithSubtask); } } else { - LOG.debug("Ignoring checkpoint commit notification for non-running task."); + LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask); } } @@ -1228,14 +1277,14 @@ public class Task implements Runnable, TaskActions { invokable.cancel(); } catch (Throwable t) { - LOG.error("Error while canceling task " + taskNameWithSubtask, t); + LOG.error("Error while canceling task {}.", taskNameWithSubtask, t); } } } @Override public String toString() { - return taskNameWithSubtask + " [" + executionState + ']'; + return String.format("%s (%s) [%s]", taskNameWithSubtask, executionId, executionState); } /** @@ -1312,7 +1361,7 @@ public class Task implements Runnable, TaskActions { try { invokable.cancel(); } catch (Throwable t) { - logger.error("Error while canceling the task", t); + logger.error("Error while canceling the task {}.", taskName, t); } // Early release of input and output buffer pools. We do this @@ -1326,7 +1375,7 @@ public class Task implements Runnable, TaskActions { try { partition.destroyBufferPool(); } catch (Throwable t) { - LOG.error("Failed to release result partition buffer pool.", t); + LOG.error("Failed to release result partition buffer pool for task {}.", taskName, t); } } @@ -1334,7 +1383,7 @@ public class Task implements Runnable, TaskActions { try { inputGate.releaseAllResources(); } catch (Throwable t) { - LOG.error("Failed to release input gate.", t); + LOG.error("Failed to release input gate for task {}.", taskName, t); } } @@ -1352,7 +1401,7 @@ public class Task implements Runnable, TaskActions { watchDogThread.join(); } } catch (Throwable t) { - logger.error("Error in the task canceler", t); + logger.error("Error in the task canceler for task {}.", taskName, t); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java new file mode 100644 index 0000000..6ee0141 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { + + @Test + public void testPathConversion() { + final long checkpointId = 42L; + + final String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId); + + assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 54f6c10..88a29ab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -211,7 +211,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> boolean disposed = false; try { // -------- Initialize --------- - LOG.debug("Initializing {}", getName()); + LOG.debug("Initializing {}.", getName()); asyncOperationsThreadPool = Executors.newCachedThreadPool(); @@ -528,8 +528,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> catch (Exception e) { // propagate exceptions only if the task is still in "running" state if (isRunning) { - throw e; + throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + + "for operator " + getName() + '.', e); } else { + LOG.debug("Could not perform checkpoint {} for operator {} while the " + + "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e); return false; } } @@ -541,10 +544,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> performCheckpoint(checkpointMetaData); } catch (CancelTaskException e) { - throw e; + throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " + + checkpointMetaData.getCheckpointId() + '.'); } catch (Exception e) { - throw new Exception("Error while performing a checkpoint", e); + throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + + getName() + '.', e); } } @@ -678,7 +683,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> if (stateBackend != null) { // backend has been configured on the environment - LOG.info("Using user-defined state backend: " + stateBackend); + LOG.info("Using user-defined state backend: {}.", stateBackend); } else { // see if we have a backend specified in the configuration Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration(); @@ -697,8 +702,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> case "filesystem": FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig); - LOG.info("State backend is set to heap memory (checkpoints to filesystem \"" - + backend.getBasePath() + "\")"); + LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")", + backend.getBasePath()); stateBackend = backend; break; @@ -933,7 +938,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } catch (Exception e) { // registers the exception and tries to fail the whole task - AsynchronousException asyncException = new AsynchronousException(e); + AsynchronousException asyncException = new AsynchronousException( + new Exception( + "Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() + + " for operator " + owner.getName() + '.', + e)); owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException); } finally { owner.cancelables.unregisterClosable(this);
