Repository: flink
Updated Branches:
  refs/heads/master ccf35cf20 -> 7a5189525


[hotfix] Move logging of JobStatus changes into the ExecutionGraph

Prior the JobManager was responsible for logging the JobStatus changes. This 
introduced
out of order logging since the JM was a mere job status listener which was 
notified by
an asynchronous message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a518952
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a518952
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a518952

Branch: refs/heads/master
Commit: 7a518952512238099ea8350156d38cfdbe9871ea
Parents: ccf35cf
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Nov 9 12:19:10 2016 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Nov 9 12:19:10 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/ExecutionGraph.java  | 14 ++++++--------
 .../apache/flink/runtime/jobmanager/JobManager.scala  |  4 ----
 2 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a518952/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8a4f3ef..1231b45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -970,9 +970,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
        private boolean transitionState(JobStatus current, JobStatus newState, 
Throwable error) {
                if (STATE_UPDATER.compareAndSet(this, current, newState)) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("{} switched from {} to {}.", 
this.getJobName(), current, newState);
-                       }
+                       LOG.info("Job {} ({}) switched from state {} to {}.", 
jobName, jobID, current, newState, error);
 
                        stateTimestamps[newState.ordinal()] = 
System.currentTimeMillis();
                        notifyJobStatusChange(newState, error);
@@ -1051,20 +1049,20 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                if (currentState == JobStatus.FAILING || currentState == 
JobStatus.RESTARTING) {
                        synchronized (progressLock) {
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Try to restart the job or 
fail it if no longer possible.", failureCause);
+                                       LOG.debug("Try to restart or fail the 
job {} ({}) if no longer possible.", jobName, jobID, failureCause);
                                } else {
-                                       LOG.info("Try to restart the job or 
fail it if no longer possible.");
+                                       LOG.info("Try to restart or fail the 
job {} ({}) if no longer possible.", jobName, jobID);
                                }
 
                                boolean isRestartable = !(failureCause 
instanceof SuppressRestartsException) && restartStrategy.canRestart();
 
                                if (isRestartable && 
transitionState(currentState, JobStatus.RESTARTING)) {
-                                       LOG.info("Restarting the job...");
+                                       LOG.info("Restarting the job {} ({}).", 
jobName, jobID);
                                        restartStrategy.restart(this);
 
                                        return true;
                                } else if (!isRestartable && 
transitionState(currentState, JobStatus.FAILED, failureCause)) {
-                                       LOG.info("Could not restart the job.", 
failureCause);
+                                       LOG.info("Could not restart the job {} 
({}).", jobName, jobID, failureCause);
                                        postRunCleanup();
 
                                        return true;
@@ -1195,7 +1193,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        if (execution != null) {
                                execution.setAccumulators(userAccumulators);
                        } else {
-                               LOG.warn("Received accumulator result for 
unknown execution {}.", execID);
+                               LOG.debug("Received accumulator result for 
unknown execution {}.", execID);
                        }
                } catch (Exception e) {
                        LOG.error("Cannot update accumulators for job {}.", 
jobID, e);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a518952/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 31f9dd7..9af5355 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -842,10 +842,6 @@ class JobManager(
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
 
-          log.info(
-            s"Status of job $jobID (${executionGraph.getJobName}) changed to 
$newJobStatus.",
-            error)
-
           if (newJobStatus.isGloballyTerminalState()) {
             jobInfo.end = timeStamp
 

Reply via email to