Repository: flink
Updated Branches:
  refs/heads/release-0.9 ecfde6dd9 -> f5f0709c9


[FLINK-2206] Fix incorrect counts of finished, canceled, and failed jobs in 
webinterface

This closes #826


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e513be72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e513be72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e513be72

Branch: refs/heads/release-0.9
Commit: e513be72a486b4f2e13c617eb6d4d08c03503ae7
Parents: ecfde6d
Author: Fabian Hueske <fhue...@apache.org>
Authored: Fri Jun 12 01:45:03 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Jun 12 14:26:47 2015 +0200

----------------------------------------------------------------------
 .../jobmanager/web/JobManagerInfoServlet.java   | 31 +++++++++++++++++
 .../js/jobmanagerFrontend.js                    | 36 +++++++++-----------
 .../runtime/jobmanager/MemoryArchivist.scala    | 17 +++++++++
 .../runtime/messages/ArchiveMessages.scala      | 11 +++++-
 4 files changed, 75 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 6d58306..3fc3c82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 import org.eclipse.jetty.io.EofException;
 
+import scala.Tuple3;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -117,6 +118,20 @@ public class JobManagerInfoServlet extends HttpServlet {
                                        writeJsonForArchive(resp.getWriter(), 
archivedJobs);
                                }
                        }
+                       else if("jobcounts".equals(req.getParameter("get"))) {
+                               response = Patterns.ask(archive, 
ArchiveMessages.getRequestJobCounts(),
+                                               new Timeout(timeout));
+
+                               result = Await.result(response, timeout);
+
+                               if(!(result instanceof Tuple3)) {
+                                       throw new 
RuntimeException("RequestJobCounts requires a response of type " +
+                                                       "Tuple3. Instead the 
response is of type " + result.getClass() +
+                                                       ".");
+                               } else {
+                                       writeJsonForJobCounts(resp.getWriter(), 
(Tuple3)result);
+                               }
+                       }
                        else if("job".equals(req.getParameter("get"))) {
                                String jobId = req.getParameter("job");
 
@@ -341,6 +356,22 @@ public class JobManagerInfoServlet extends HttpServlet {
        }
 
        /**
+        * Writes Json with the job counts
+        *
+        * @param wrt
+        * @param counts
+        */
+       private void writeJsonForJobCounts(PrintWriter wrt, Tuple3<Integer, 
Integer, Integer> jobCounts) {
+
+               wrt.write("{");
+               wrt.write("\"finished\": " + jobCounts._1() + ",");
+               wrt.write("\"canceled\": " + jobCounts._2() + ",");
+               wrt.write("\"failed\": "   + jobCounts._3());
+               wrt.write("}");
+
+       }
+
+       /**
         * Writes infos about archived job in Json format, including 
groupvertices and groupverticetimes
         *
         * @param wrt

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js 
b/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
index 92f6979..63d287c 100644
--- 
a/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
+++ 
b/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
@@ -81,6 +81,22 @@ function poll(jobId) {
 })();
 
 /*
+ * Polls the job execution counts on page load and every 2 seconds
+ */
+(function pollJobCounts() {
+       $.ajax({ url : "jobsInfo?get=jobcounts", cache: false, type : "GET",
+           success : function(json) {
+
+               $("#jobs-finished").html(json.finished);
+               $("#jobs-canceled").html(json.canceled);
+               $("#jobs-failed").html(json.failed);
+
+           }, dataType : "json",
+       });
+       setTimeout(pollJobCounts, 2000);
+})();
+
+/*
  * Polls the number of taskmanagers on page load
  */
 (function pollTaskmanagers() {
@@ -418,20 +434,12 @@ function updateTable(json) {
        }
 }
 
-var archive_finished = 0;
-var archive_failed = 0;
-var archive_canceled = 0;
-
 /*
  * Creates job history table
  */
 function fillTableArchive(table, json) {
        $(table).html("");
-       
-       $("#jobs-finished").html(archive_finished);
-       $("#jobs-failed").html(archive_failed);
-       $("#jobs-canceled").html(archive_canceled);
-       
+
        $.each(json, function(i, job) {
                _fillTableArchive(table, job, false)
        });
@@ -459,14 +467,4 @@ function _fillTableArchive(table, job, prepend) {
                                                + job.jobname + " ("
                                                + 
formattedTimeFromTimestamp(parseInt(job.time))
                                                + ")</a></li>");
-       if (job.status == "FINISHED")
-               archive_finished++;
-       if (job.status == "FAILED")
-               archive_failed++;
-       if (job.status == "CANCELED")
-               archive_canceled++;
-       
-       $("#jobs-finished").html(archive_finished);
-       $("#jobs-failed").html(archive_failed);
-       $("#jobs-canceled").html(archive_canceled);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 62ea435..54d2f2f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.Actor
 import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
 import org.apache.flink.runtime.messages.ArchiveMessages._
@@ -45,6 +46,8 @@ import scala.collection.mutable
  *  then a [[CurrentJobStatus]] message with the last state is returned to the 
sender, otherwise
  *  a [[JobNotFound]] message is returned
  *
+ *  - [[RequestJobCounts]] returns the number of finished, canceled, and 
failed jobs as a Tuple3
+ *
  * @param max_entries Maximum number of stored Flink jobs
  */
 class MemoryArchivist(private val max_entries: Int)
@@ -57,12 +60,23 @@ class MemoryArchivist(private val max_entries: Int)
    */
   val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
 
+  /* Counters for finished, canceled, and failed jobs */
+  var finishedCnt: Int = 0
+  var canceledCnt: Int = 0
+  var failedCnt: Int = 0
+
   override def receiveWithLogMessages: Receive = {
     
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) => 
       // wrap graph inside a soft reference
       graphs.update(jobID, graph)
+      // update job counters
+      graph.getState match {
+        case JobStatus.FINISHED => finishedCnt += 1
+        case JobStatus.CANCELED => canceledCnt += 1
+        case JobStatus.FAILED => failedCnt += 1
+      }
       trimHistory()
 
     case RequestArchivedJob(jobID: JobID) =>
@@ -83,6 +97,9 @@ class MemoryArchivist(private val max_entries: Int)
         case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
         case None => sender ! JobNotFound(jobID)
       }
+
+    case RequestJobCounts =>
+      sender ! (finishedCnt, canceledCnt, failedCnt)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index e9e7dec..c4e3f3e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -34,6 +34,11 @@ object ArchiveMessages {
   case object RequestArchivedJobs
 
   /**
+   * Requests the number of finished, canceled, and failed jobs
+   */
+  case object RequestJobCounts
+
+  /**
    * Reqeuest a specific ExecutionGraph by JobID. The response is 
[[RequestArchivedJob]]
    * @param jobID
    */
@@ -56,7 +61,7 @@ object ArchiveMessages {
       jobs.asJavaCollection
     }
   }
-  
+
   // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
@@ -64,4 +69,8 @@ object ArchiveMessages {
   def getRequestArchivedJobs : AnyRef = {
     RequestArchivedJobs
   }
+
+  def getRequestJobCounts : AnyRef = {
+    RequestJobCounts
+  }
 }

Reply via email to