[hotfix] Small improvements in logging for local recovery
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89935997 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89935997 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89935997 Branch: refs/heads/master Commit: 89935997de03b0f6db89d111a087b0f0f210695d Parents: 2bc1eaa Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Wed May 16 17:09:28 2018 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu May 17 10:03:04 2018 +0200 ---------------------------------------------------------------------- .../TaskExecutorLocalStateStoresManager.java | 14 ++++++-------- .../runtime/state/TaskLocalStateStoreImpl.java | 20 ++++++++++++-------- .../runtime/state/TaskStateManagerImpl.java | 7 ++----- 3 files changed, 20 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index cb3b680..6826fbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -158,15 +158,13 @@ public class TaskExecutorLocalStateStoresManager { taskStateManagers.put(taskKey, taskLocalStateStore); - if (LOG.isTraceEnabled()) { - LOG.trace("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.", - localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID); - } + + LOG.debug("Registered new local state store with configuration {} for {} - {} - {} under allocation " + + "id {}.", localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID); + } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Found existing local state store for {} - {} - {} under allocation id {}.", - jobId, jobVertexID, subtaskIndex, allocationID); - } + LOG.debug("Found existing local state store for {} - {} - {} under allocation id {}: {}", + jobId, jobVertexID, subtaskIndex, allocationID, taskLocalStateStore); } return taskLocalStateStore; http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index 9d105e6..df9147c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -190,17 +190,20 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { snapshot = storedTaskStateByCheckpointID.get(checkpointID); } - snapshot = (snapshot != NULL_DUMMY) ? snapshot : null; - - if (LOG.isTraceEnabled()) { - LOG.trace("Found entry for local state for checkpoint {} in subtask ({} - {} - {}) : {}", - checkpointID, jobID, jobVertexID, subtaskIndex, snapshot); - } else if (LOG.isDebugEnabled()) { - LOG.debug("Found entry for local state for checkpoint {} in subtask ({} - {} - {})", + if (snapshot != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}", + checkpointID, jobID, jobVertexID, subtaskIndex, snapshot); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Found registered local state for checkpoint {} in subtask ({} - {} - {})", + checkpointID, jobID, jobVertexID, subtaskIndex); + } + } else { + LOG.debug("Did not find registered local state for checkpoint {} in subtask ({} - {} - {})", checkpointID, jobID, jobVertexID, subtaskIndex); } - return snapshot; + return (snapshot != NULL_DUMMY) ? snapshot : null; } @Override @@ -357,6 +360,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { ", allocationID=" + allocationID + ", subtaskIndex=" + subtaskIndex + ", localRecoveryConfig=" + localRecoveryConfig + + ", storedCheckpointIDs=" + storedTaskStateByCheckpointID.keySet() + '}'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java index e542ba1..a0aeb3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java @@ -136,11 +136,8 @@ public class TaskStateManagerImpl implements TaskStateManager { } } - if (LOG.isTraceEnabled()) { - LOG.trace("Operator {} has remote state {} from job manager and local state alternatives {} from local " + - "state store {}.", - operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore); - } + LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local " + + "state store {}.", operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore); PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder( jobManagerSubtaskState,