[FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f91dd9fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f91dd9fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f91dd9fb

Branch: refs/heads/master
Commit: f91dd9fbaf392bc2968e974dddba4cda2a4f3be2
Parents: dc7d8ec
Author: Ufuk Celebi <u...@apache.org>
Authored: Tue Nov 29 16:35:14 2016 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/SubmittedJobGraph.java   |  2 +-
 .../ZooKeeperSubmittedJobGraphStore.java        | 86 +++++++++++++-------
 2 files changed, 56 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index faacc93..e868da7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -72,6 +72,6 @@ public class SubmittedJobGraph implements Serializable {
 
        @Override
        public String toString() {
-               return String.format("SubmittedJobGraph(%s, %s)", jobGraph, 
jobInfo);
+               return String.format("SubmittedJobGraph(%s, %s)", 
jobGraph.getJobID(), jobInfo);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index b241712..c1dc656 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
         */
        private final PathChildrenCache pathCache;
 
+       /** The full configured base path including the namespace. */
+       private final String zooKeeperFullBasePath;
+
        /** The external listener to be notified on races. */
        private SubmittedJobGraphListener jobGraphListener;
 
@@ -117,6 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                // All operations will have the path as root
                CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + currentJobsPath);
 
+               this.zooKeeperFullBasePath = client.getNamespace() + 
currentJobsPath;
                this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
 
                this.pathCache = new PathChildrenCache(facade, "/", false);
@@ -156,6 +160,7 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                synchronized (cacheLock) {
                        verifyIsRunning();
 
+                       LOG.debug("Recovering all job graphs from ZooKeeper at 
{}.", zooKeeperFullBasePath);
                        List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, 
String>> submitted;
 
                        while (true) {
@@ -168,6 +173,8 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                                }
                        }
 
+                       LOG.info("Found {} job graphs.", submitted.size());
+
                        if (submitted.size() != 0) {
                                List<SubmittedJobGraph> jobGraphs = new 
ArrayList<>(submitted.size());
 
@@ -193,6 +200,8 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                checkNotNull(jobId, "Job ID");
                String path = getPathForJob(jobId);
 
+               LOG.debug("Recovering job graph {} from {}{}.", jobId, 
zooKeeperFullBasePath, path);
+
                synchronized (cacheLock) {
                        verifyIsRunning();
 
@@ -215,6 +224,8 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                checkNotNull(jobGraph, "Job graph");
                String path = getPathForJob(jobGraph.getJobId());
 
+               LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), 
zooKeeperFullBasePath, path);
+
                boolean success = false;
 
                while (!success) {
@@ -229,8 +240,6 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
 
                                                
addedJobGraphs.add(jobGraph.getJobId());
 
-                                               LOG.info("Added {} to 
ZooKeeper.", jobGraph);
-
                                                success = true;
                                        }
                                        catch 
(KeeperException.NodeExistsException ignored) {
@@ -252,6 +261,8 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                                }
                        }
                }
+
+               LOG.info("Added {} to ZooKeeper.", jobGraph);
        }
 
        @Override
@@ -259,14 +270,17 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                checkNotNull(jobId, "Job ID");
                String path = getPathForJob(jobId);
 
+               LOG.debug("Removing job graph {} from {}{}.", jobId, 
zooKeeperFullBasePath, path);
+
                synchronized (cacheLock) {
                        if (addedJobGraphs.contains(jobId)) {
                                
jobGraphsInZooKeeper.removeAndDiscardState(path);
 
                                addedJobGraphs.remove(jobId);
-                               LOG.info("Removed job graph {} from 
ZooKeeper.", jobId);
                        }
                }
+
+               LOG.info("Removed job graph {} from ZooKeeper.", jobId);
        }
 
        /**
@@ -291,70 +305,80 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                        }
 
                        switch (event.getType()) {
-                               case CHILD_ADDED:
+                               case CHILD_ADDED: {
+                                       JobID jobId = fromEvent(event);
+
+                                       LOG.debug("Received CHILD_ADDED event 
notification for job {}", jobId);
+
                                        synchronized (cacheLock) {
                                                try {
-                                                       JobID jobId = 
fromEvent(event);
                                                        if (jobGraphListener != 
null && !addedJobGraphs.contains(jobId)) {
                                                                try {
                                                                        // 
Whoa! This has been added by someone else. Or we were fast
                                                                        // to 
remove it (false positive).
                                                                        
jobGraphListener.onAddedJobGraph(jobId);
-                                                               }
-                                                               catch 
(Throwable t) {
+                                                               } catch 
(Throwable t) {
                                                                        
LOG.error("Error in callback", t);
                                                                }
                                                        }
-                                               }
-                                               catch (Exception e) {
+                                               } catch (Exception e) {
                                                        LOG.error("Error in 
SubmittedJobGraphsPathCacheListener", e);
                                                }
                                        }
+                               }
+                               break;
 
-                                       break;
-
-                               case CHILD_UPDATED:
+                               case CHILD_UPDATED: {
                                        // Nothing to do
-                                       break;
+                               }
+                               break;
+
+                               case CHILD_REMOVED: {
+                                       JobID jobId = fromEvent(event);
+
+                                       LOG.debug("Received CHILD_REMOVED event 
notification for job {}", jobId);
 
-                               case CHILD_REMOVED:
                                        synchronized (cacheLock) {
                                                try {
-                                                       JobID jobId = 
fromEvent(event);
                                                        if (jobGraphListener != 
null && addedJobGraphs.contains(jobId)) {
                                                                try {
                                                                        // Oh 
oh. Someone else removed one of our job graphs. Mean!
                                                                        
jobGraphListener.onRemovedJobGraph(jobId);
-                                                               }
-                                                               catch 
(Throwable t) {
+                                                               } catch 
(Throwable t) {
                                                                        
LOG.error("Error in callback", t);
                                                                }
                                                        }
 
                                                        break;
-                                               }
-                                               catch (Exception e) {
+                                               } catch (Exception e) {
                                                        LOG.error("Error in 
SubmittedJobGraphsPathCacheListener", e);
                                                }
                                        }
-                                       break;
+                               }
+                               break;
 
-                               case CONNECTION_SUSPENDED:
+                               case CONNECTION_SUSPENDED: {
                                        LOG.warn("ZooKeeper connection 
SUSPENDED. Changes to the submitted job " +
-                                                       "graphs are not 
monitored (temporarily).");
-                                       break;
-                               case CONNECTION_LOST:
+                                               "graphs are not monitored 
(temporarily).");
+                               }
+                               break;
+
+                               case CONNECTION_LOST: {
                                        LOG.warn("ZooKeeper connection LOST. 
Changes to the submitted job " +
-                                                       "graphs are not 
monitored (permanently).");
-                                       break;
+                                               "graphs are not monitored 
(permanently).");
+                               }
+                               break;
 
-                               case CONNECTION_RECONNECTED:
+                               case CONNECTION_RECONNECTED: {
                                        LOG.info("ZooKeeper connection 
RECONNECTED. Changes to the submitted job " +
-                                                       "graphs are monitored 
again.");
-                                       break;
-                               case INITIALIZED:
+                                               "graphs are monitored again.");
+                               }
+                               break;
+
+                               case INITIALIZED: {
                                        
LOG.info("SubmittedJobGraphsPathCacheListener initialized");
-                                       break;
+                               }
+                               break;
                        }
                }
 

Reply via email to