[hotfix] Small improvements in logging for local recovery

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89935997
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89935997
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89935997

Branch: refs/heads/master
Commit: 89935997de03b0f6db89d111a087b0f0f210695d
Parents: 2bc1eaa
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Wed May 16 17:09:28 2018 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200

----------------------------------------------------------------------
 .../TaskExecutorLocalStateStoresManager.java    | 14 ++++++--------
 .../runtime/state/TaskLocalStateStoreImpl.java  | 20 ++++++++++++--------
 .../runtime/state/TaskStateManagerImpl.java     |  7 ++-----
 3 files changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index cb3b680..6826fbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -158,15 +158,13 @@ public class TaskExecutorLocalStateStoresManager {
 
                                taskStateManagers.put(taskKey, 
taskLocalStateStore);
 
-                               if (LOG.isTraceEnabled()) {
-                                       LOG.trace("Registered new local state 
store with configuration {} for {} - {} - {} under allocation id {}.",
-                                               localRecoveryConfig, jobId, 
jobVertexID, subtaskIndex, allocationID);
-                               }
+
+                               LOG.debug("Registered new local state store 
with configuration {} for {} - {} - {} under allocation " +
+                                               "id {}.", localRecoveryConfig, 
jobId, jobVertexID, subtaskIndex, allocationID);
+
                        } else {
-                               if (LOG.isTraceEnabled()) {
-                                       LOG.trace("Found existing local state 
store for {} - {} - {} under allocation id {}.",
-                                               jobId, jobVertexID, 
subtaskIndex, allocationID);
-                               }
+                               LOG.debug("Found existing local state store for 
{} - {} - {} under allocation id {}: {}",
+                                       jobId, jobVertexID, subtaskIndex, 
allocationID, taskLocalStateStore);
                        }
 
                        return taskLocalStateStore;

http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index 9d105e6..df9147c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -190,17 +190,20 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
                        snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
                }
 
-               snapshot = (snapshot != NULL_DUMMY) ? snapshot : null;
-
-               if (LOG.isTraceEnabled()) {
-                       LOG.trace("Found entry for local state for checkpoint 
{} in subtask ({} - {} - {}) : {}",
-                               checkpointID, jobID, jobVertexID, subtaskIndex, 
snapshot);
-               } else if (LOG.isDebugEnabled()) {
-                       LOG.debug("Found entry for local state for checkpoint 
{} in subtask ({} - {} - {})",
+               if (snapshot != null) {
+                       if (LOG.isTraceEnabled()) {
+                               LOG.trace("Found registered local state for 
checkpoint {} in subtask ({} - {} - {}) : {}",
+                                       checkpointID, jobID, jobVertexID, 
subtaskIndex, snapshot);
+                       } else if (LOG.isDebugEnabled()) {
+                               LOG.debug("Found registered local state for 
checkpoint {} in subtask ({} - {} - {})",
+                                       checkpointID, jobID, jobVertexID, 
subtaskIndex);
+                       }
+               } else {
+                       LOG.debug("Did not find registered local state for 
checkpoint {} in subtask ({} - {} - {})",
                                checkpointID, jobID, jobVertexID, subtaskIndex);
                }
 
-               return snapshot;
+               return (snapshot != NULL_DUMMY) ? snapshot : null;
        }
 
        @Override
@@ -357,6 +360,7 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
                        ", allocationID=" + allocationID +
                        ", subtaskIndex=" + subtaskIndex +
                        ", localRecoveryConfig=" + localRecoveryConfig +
+                       ", storedCheckpointIDs=" + 
storedTaskStateByCheckpointID.keySet() +
                        '}';
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index e542ba1..a0aeb3c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -136,11 +136,8 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
                        }
                }
 
-               if (LOG.isTraceEnabled()) {
-                       LOG.trace("Operator {} has remote state {} from job 
manager and local state alternatives {} from local " +
-                                       "state store {}.",
-                               operatorID, jobManagerSubtaskState, 
alternativesByPriority, localStateStore);
-               }
+               LOG.debug("Operator {} has remote state {} from job manager and 
local state alternatives {} from local " +
+                               "state store {}.", operatorID, 
jobManagerSubtaskState, alternativesByPriority, localStateStore);
 
                PrioritizedOperatorSubtaskState.Builder builder = new 
PrioritizedOperatorSubtaskState.Builder(
                        jobManagerSubtaskState,

Reply via email to