Repository: flink
Updated Branches:
  refs/heads/master 416ff589e -> fab61a195


[FLINK-2409] [webserver] Replaces ActorRefs with ActorGateways in the web 
server to automatically decorate messages with a leader session ID.

Refactored MiniCluster to also store a reference to the web server to stop it. 
Adds support for the new web interface for yarn

Fix web server start condition

This closes #959.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab61a19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab61a19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab61a19

Branch: refs/heads/master
Commit: fab61a1954ff1554448e826e1d273689ed520fc3
Parents: 416ff58
Author: Till Rohrmann <[email protected]>
Authored: Wed Jul 29 18:03:52 2015 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Mon Aug 3 13:47:31 2015 +0200

----------------------------------------------------------------------
 .../webmonitor/ExecutionGraphHolder.java        | 14 +--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  5 +-
 .../handlers/RequestJobIdsHandler.java          | 14 +--
 .../handlers/RequestOverviewHandler.java        | 14 +--
 .../legacy/JobManagerInfoHandler.java           | 55 ++++++-----
 .../jobmanager/web/JobManagerInfoServlet.java   | 52 +++++------
 .../jobmanager/web/SetupInfoServlet.java        | 18 ++--
 .../runtime/jobmanager/web/WebInfoServer.java   |  8 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 97 +++++++++----------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 50 +++++++++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 45 +++++----
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../runtime/testingUtils/TestingCluster.scala   | 21 +++--
 .../test/util/ForkableFlinkMiniCluster.scala    | 41 ++++----
 .../apache/flink/yarn/ApplicationMaster.scala   | 98 +++++++++++++-------
 15 files changed, 293 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 18a548c..a017f3a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
 import scala.concurrent.Await;
@@ -42,18 +39,18 @@ import java.util.WeakHashMap;
  */
 public class ExecutionGraphHolder {
        
-       private final ActorRef source;
+       private final ActorGateway source;
        
        private final FiniteDuration timeout;
        
        private final WeakHashMap<JobID, ExecutionGraph> cache = new 
WeakHashMap<JobID, ExecutionGraph>();
        
        
-       public ExecutionGraphHolder(ActorRef source) {
+       public ExecutionGraphHolder(ActorGateway source) {
                this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
        }
 
-       public ExecutionGraphHolder(ActorRef source, FiniteDuration timeout) {
+       public ExecutionGraphHolder(ActorGateway source, FiniteDuration 
timeout) {
                if (source == null || timeout == null) {
                        throw new NullPointerException();
                }
@@ -69,8 +66,7 @@ public class ExecutionGraphHolder {
                }
                
                try {
-                       Timeout to = new Timeout(timeout);
-                       Future<Object> future = Patterns.ask(source, new 
JobManagerMessages.RequestJob(jid), to);
+                       Future<Object> future = source.ask(new 
JobManagerMessages.RequestJob(jid), timeout);
                        Object result = Await.result(future, timeout);
                        if (result instanceof JobManagerMessages.JobNotFound) {
                                return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index a2095d4..006d18d 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import akka.actor.ActorRef;
-
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -35,6 +33,7 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
@@ -88,7 +87,7 @@ public class WebRuntimeMonitor implements WebMonitor {
        private Channel serverChannel;
 
        
-       public WebRuntimeMonitor(Configuration config, ActorRef jobManager, 
ActorRef archive) throws IOException {
+       public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, 
ActorGateway archive) throws IOException {
                // figure out where our static contents is
                final String configuredWebRoot = 
config.getString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
                final String flinkRoot = 
config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
index 1f28a01..aa1a39f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
@@ -40,15 +37,15 @@ import java.util.Map;
  */
 public class RequestJobIdsHandler implements RequestHandler, 
RequestHandler.JsonResponse {
        
-       private final ActorRef target;
+       private final ActorGateway target;
        
        private final FiniteDuration timeout;
        
-       public RequestJobIdsHandler(ActorRef target) {
+       public RequestJobIdsHandler(ActorGateway target) {
                this(target, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
        }
        
-       public RequestJobIdsHandler(ActorRef target, FiniteDuration timeout) {
+       public RequestJobIdsHandler(ActorGateway target, FiniteDuration 
timeout) {
                if (target == null || timeout == null) {
                        throw new NullPointerException();
                }
@@ -60,8 +57,7 @@ public class RequestJobIdsHandler implements RequestHandler, 
RequestHandler.Json
        public String handleRequest(Map<String, String> params) throws 
Exception {
                // we need no parameters, get all requests
                try {
-                       Timeout to = new Timeout(timeout); 
-                       Future<Object> future = Patterns.ask(target, 
RequestJobsWithIDsOverview.getInstance(), to);
+                       Future<Object> future = 
target.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
                        JobsWithIDsOverview result = (JobsWithIDsOverview) 
Await.result(future, timeout);
                        return JsonFactory.generateJobsOverviewJSON(result);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
index e51a4d1..c2c00c7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
+import org.apache.flink.runtime.instance.ActorGateway;
 import 
org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
@@ -39,16 +36,16 @@ import java.util.Map;
  */
 public class RequestOverviewHandler implements  RequestHandler, 
RequestHandler.JsonResponse {
        
-       private final ActorRef jobManager;
+       private final ActorGateway jobManager;
        
        private final FiniteDuration timeout;
        
        
-       public RequestOverviewHandler(ActorRef jobManager) {
+       public RequestOverviewHandler(ActorGateway jobManager) {
                this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
        }
        
-       public RequestOverviewHandler(ActorRef jobManager, FiniteDuration 
timeout) {
+       public RequestOverviewHandler(ActorGateway jobManager, FiniteDuration 
timeout) {
                if (jobManager == null || timeout == null) {
                        throw new NullPointerException();
                }
@@ -59,8 +56,7 @@ public class RequestOverviewHandler implements  
RequestHandler, RequestHandler.J
        @Override
        public String handleRequest(Map<String, String> params) throws 
Exception {
                try {
-                       Timeout to = new Timeout(timeout); 
-                       Future<Object> future = Patterns.ask(jobManager, 
RequestStatusWithJobIDsOverview.getInstance(), to);
+                       Future<Object> future = 
jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(), timeout);
                        StatusWithJobIDsOverview result = 
(StatusWithJobIDsOverview) Await.result(future, timeout);
                        return 
JsonFactory.generateOverviewWithJobIDsJSON(result);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
index 0a1e08c..9b52736 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.legacy;
 
-import akka.actor.ActorRef;
-
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -41,6 +37,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -78,12 +75,12 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
        private static final Charset ENCODING = Charset.forName("UTF-8");
 
        /** Underlying JobManager */
-       private final ActorRef jobmanager;
-       private final ActorRef archive;
+       private final ActorGateway jobmanager;
+       private final ActorGateway archive;
        private final FiniteDuration timeout;
 
 
-       public JobManagerInfoHandler(ActorRef jobmanager, ActorRef archive, 
FiniteDuration timeout) {
+       public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway 
archive, FiniteDuration timeout) {
                this.jobmanager = jobmanager;
                this.archive = archive;
                this.timeout = timeout;
@@ -118,8 +115,7 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
        @SuppressWarnings("unchecked")
        private String handleRequest(Routed routed) throws Exception {
                if ("archive".equals(routed.queryParam("get"))) {
-                       Future<Object> response = Patterns.ask(archive, 
ArchiveMessages.getRequestArchivedJobs(),
-                                       new Timeout(timeout));
+                       Future<Object> response = 
archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
 
                        Object result = Await.result(response, timeout);
 
@@ -135,8 +131,7 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                        }
                }
                else if ("jobcounts".equals(routed.queryParam("get"))) {
-                       Future<Object> response = Patterns.ask(archive, 
ArchiveMessages.getRequestJobCounts(),
-                                       new Timeout(timeout));
+                       Future<Object> response = 
archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
 
                        Object result = Await.result(response, timeout);
 
@@ -152,8 +147,8 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                else if ("job".equals(routed.queryParam("get"))) {
                        String jobId = routed.queryParam("job");
 
-                       Future<Object> response = Patterns.ask(archive, new 
JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
-                                       new Timeout(timeout));
+                       Future<Object> response = archive.ask(new 
JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+                                       timeout);
 
                        Object result = Await.result(response, timeout);
 
@@ -182,8 +177,8 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                                throw new Exception("Found null groupVertexId");
                        }
 
-                       Future<Object> response = Patterns.ask(archive, new 
JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
-                                       new Timeout(timeout));
+                       Future<Object> response = archive.ask(new 
JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+                                       timeout);
 
                        Object result = Await.result(response, timeout);
 
@@ -205,9 +200,9 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                        }
                }
                else if ("taskmanagers".equals(routed.queryParam("get"))) {
-                       Future<Object> response = Patterns.ask(jobmanager,
+                       Future<Object> response = jobmanager.ask(
                                        
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-                                       new Timeout(timeout));
+                                       timeout);
 
                        Object result = Await.result(response, timeout);
 
@@ -219,9 +214,9 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                        else {
                                final int numberOfTaskManagers = 
(Integer)result;
 
-                               final Future<Object> responseRegisteredSlots = 
Patterns.ask(jobmanager,
+                               final Future<Object> responseRegisteredSlots = 
jobmanager.ask(
                                                
JobManagerMessages.getRequestTotalNumberOfSlots(),
-                                               new Timeout(timeout));
+                                               timeout);
 
                                final Object resultRegisteredSlots = 
Await.result(responseRegisteredSlots,
                                                timeout);
@@ -242,8 +237,8 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                else if ("cancel".equals(routed.queryParam("get"))) {
                        String jobId = routed.queryParam("job");
 
-                       Future<Object> response = Patterns.ask(jobmanager, new 
JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
-                                       new Timeout(timeout));
+                       Future<Object> response = jobmanager.ask(new 
JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
+                                       timeout);
 
                        Await.ready(response, timeout);
                        return "{}";
@@ -256,8 +251,8 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                        return writeJsonForVersion();
                }
                else{
-                       Future<Object> response = Patterns.ask(jobmanager, 
JobManagerMessages.getRequestRunningJobs(),
-                                       new Timeout(timeout));
+                       Future<Object> response = 
jobmanager.ask(JobManagerMessages.getRequestRunningJobs(),
+                                       timeout);
 
                        Object result = Await.result(response, timeout);
 
@@ -454,8 +449,9 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                }
 
                // write accumulators
-               final Future<Object> response = Patterns.ask(jobmanager,
-                               new 
RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));
+               final Future<Object> response = jobmanager.ask(
+                               new 
RequestAccumulatorResultsStringified(graph.getJobID()),
+                               timeout);
 
                Object result;
                try {
@@ -549,9 +545,9 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
 
 
        private String writeJsonUpdatesForJob(JobID jobId) {
-               final Future<Object> responseArchivedJobs = 
Patterns.ask(jobmanager,
+               final Future<Object> responseArchivedJobs = jobmanager.ask(
                                JobManagerMessages.getRequestRunningJobs(),
-                               new Timeout(timeout));
+                               timeout);
 
                Object resultArchivedJobs;
                try{
@@ -591,8 +587,9 @@ public class JobManagerInfoHandler extends 
SimpleChannelInboundHandler<Routed> {
                        }
                        bld.append("],");
 
-                       final Future<Object> responseJob = 
Patterns.ask(jobmanager, new JobManagerMessages.RequestJob(jobId),
-                                       new Timeout(timeout));
+                       final Future<Object> responseJob = jobmanager.ask(
+                                       new 
JobManagerMessages.RequestJob(jobId),
+                                       timeout);
 
                        Object resultJob;
                        try{

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 82ab63e..ce57714 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -32,12 +32,9 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import akka.actor.ActorRef;
-
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
 import org.apache.flink.runtime.messages.ArchiveMessages;
@@ -78,12 +75,12 @@ public class JobManagerInfoServlet extends HttpServlet {
        private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerInfoServlet.class);
 
        /** Underlying JobManager */
-       private final ActorRef jobmanager;
-       private final ActorRef archive;
+       private final ActorGateway jobmanager;
+       private final ActorGateway archive;
        private final FiniteDuration timeout;
 
 
-       public JobManagerInfoServlet(ActorRef jobmanager, ActorRef archive, 
FiniteDuration timeout) {
+       public JobManagerInfoServlet(ActorGateway jobmanager, ActorGateway 
archive, FiniteDuration timeout) {
                this.jobmanager = jobmanager;
                this.archive = archive;
                this.timeout = timeout;
@@ -102,8 +99,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 
                try {
                        if("archive".equals(req.getParameter("get"))) {
-                               response = Patterns.ask(archive, 
ArchiveMessages.getRequestArchivedJobs(),
-                                               new Timeout(timeout));
+                               response = 
archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
 
                                result = Await.result(response, timeout);
 
@@ -119,8 +115,7 @@ public class JobManagerInfoServlet extends HttpServlet {
                                }
                        }
                        else if("jobcounts".equals(req.getParameter("get"))) {
-                               response = Patterns.ask(archive, 
ArchiveMessages.getRequestJobCounts(),
-                                               new Timeout(timeout));
+                               response = 
archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
 
                                result = Await.result(response, timeout);
 
@@ -135,8 +130,7 @@ public class JobManagerInfoServlet extends HttpServlet {
                        else if("job".equals(req.getParameter("get"))) {
                                String jobId = req.getParameter("job");
 
-                               response = Patterns.ask(archive, new 
RequestJob(JobID.fromHexString(jobId)),
-                                               new Timeout(timeout));
+                               response = archive.ask(new 
RequestJob(JobID.fromHexString(jobId)), timeout);
 
                                result = Await.result(response, timeout);
 
@@ -163,8 +157,7 @@ public class JobManagerInfoServlet extends HttpServlet {
                                        return;
                                }
 
-                               response = Patterns.ask(archive, new 
RequestJob(JobID.fromHexString(jobId)),
-                                               new Timeout(timeout));
+                               response = archive.ask(new 
RequestJob(JobID.fromHexString(jobId)), timeout);
 
                                result = Await.result(response, timeout);
 
@@ -186,9 +179,9 @@ public class JobManagerInfoServlet extends HttpServlet {
                        }
                        else if("taskmanagers".equals(req.getParameter("get"))) 
{
 
-                               response = Patterns.ask(jobmanager,
+                               response = jobmanager.ask(
                                                
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-                                               new Timeout(timeout));
+                                               timeout);
 
                                result = Await.result(response, timeout);
 
@@ -199,9 +192,9 @@ public class JobManagerInfoServlet extends HttpServlet {
                                } else {
                                        final int numberOfTaskManagers = 
(Integer)result;
 
-                                       final Future<Object> 
responseRegisteredSlots = Patterns.ask(jobmanager,
+                                       final Future<Object> 
responseRegisteredSlots = jobmanager.ask(
                                                        
JobManagerMessages.getRequestTotalNumberOfSlots(),
-                                                       new Timeout(timeout));
+                                                       timeout);
 
                                        final Object resultRegisteredSlots = 
Await.result(responseRegisteredSlots,
                                                        timeout);
@@ -221,8 +214,9 @@ public class JobManagerInfoServlet extends HttpServlet {
                        else if("cancel".equals(req.getParameter("get"))) {
                                String jobId = req.getParameter("job");
 
-                               response = Patterns.ask(jobmanager, new 
CancelJob(JobID.fromHexString(jobId)),
-                                               new Timeout(timeout));
+                               response = jobmanager.ask(
+                                               new 
CancelJob(JobID.fromHexString(jobId)),
+                                               timeout);
 
                                Await.ready(response, timeout);
                        }
@@ -233,8 +227,9 @@ public class JobManagerInfoServlet extends HttpServlet {
                                writeJsonForVersion(resp.getWriter());
                        }
                        else{
-                               response = Patterns.ask(jobmanager, 
JobManagerMessages.getRequestRunningJobs(),
-                                               new Timeout(timeout));
+                               response = jobmanager.ask(
+                                               
JobManagerMessages.getRequestRunningJobs(),
+                                               timeout);
 
                                result = Await.result(response, timeout);
 
@@ -471,8 +466,8 @@ public class JobManagerInfoServlet extends HttpServlet {
                        }
 
                        // write accumulators
-                       final Future<Object> response = Patterns.ask(jobmanager,
-                                       new 
RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));
+                       final Future<Object> response = jobmanager.ask(
+                                       new 
RequestAccumulatorResultsStringified(graph.getJobID()), timeout);
 
                        Object result;
                        try {
@@ -575,9 +570,9 @@ public class JobManagerInfoServlet extends HttpServlet {
        private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
 
                try {
-                       final Future<Object> responseArchivedJobs = 
Patterns.ask(jobmanager,
+                       final Future<Object> responseArchivedJobs = 
jobmanager.ask(
                                        
JobManagerMessages.getRequestRunningJobs(),
-                                       new Timeout(timeout));
+                                       timeout);
 
                        Object resultArchivedJobs = null;
 
@@ -615,8 +610,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 
                                wrt.write("],");
 
-                               final Future<Object> responseJob = 
Patterns.ask(jobmanager, new RequestJob(jobId),
-                                               new Timeout(timeout));
+                               final Future<Object> responseJob = 
jobmanager.ask(new RequestJob(jobId), timeout);
 
                                Object resultJob = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index c3df253..1f2bfe0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -32,10 +32,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 
 import org.apache.flink.runtime.instance.InstanceID;
@@ -67,13 +65,13 @@ public class SetupInfoServlet extends HttpServlet {
 
 
        final private Configuration configuration;
-       final private ActorRef jobmanager;
+       final private ActorGateway jobmanager;
        final private FiniteDuration timeout;
 
 
-       public SetupInfoServlet(Configuration conf, ActorRef jm, FiniteDuration 
timeout) {
+       public SetupInfoServlet(Configuration conf, ActorGateway jobManager, 
FiniteDuration timeout) {
                configuration = conf;
-               this.jobmanager = jm;
+               this.jobmanager = jobManager;
                this.timeout = timeout;
        }
 
@@ -114,9 +112,9 @@ public class SetupInfoServlet extends HttpServlet {
 
        private void writeTaskmanagers(HttpServletResponse resp) throws 
IOException {
 
-               final Future<Object> response = Patterns.ask(jobmanager,
+               final Future<Object> response = jobmanager.ask(
                                
JobManagerMessages.getRequestRegisteredTaskManagers(),
-                               new Timeout(timeout));
+                               timeout);
 
                Object obj = null;
 
@@ -183,9 +181,9 @@ public class SetupInfoServlet extends HttpServlet {
                StackTrace message = null;
                Throwable exception = null;
 
-               final Future<Object> response = Patterns.ask(jobmanager,
+               final Future<Object> response = jobmanager.ask(
                                new RequestStackTrace(instanceID),
-                               new Timeout(timeout));
+                               timeout);
 
                try {
                        message = (StackTrace) Await.result(response, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index a414cf6..4383b65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -23,12 +23,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URL;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.server.Server;
@@ -45,7 +45,7 @@ import scala.concurrent.duration.FiniteDuration;
  * This class sets up a web-server that contains a web frontend to display 
information about running jobs.
  * It instantiates and configures an embedded jetty server.
  */
-public class WebInfoServer {
+public class WebInfoServer implements WebMonitor {
 
        /** Web root dir in the jar */
        private static final String WEB_ROOT_DIR = "web-docs-infoserver";
@@ -70,7 +70,7 @@ public class WebInfoServer {
         * @throws IOException
         *         Thrown, if the server setup failed for an I/O related reason.
         */
-       public WebInfoServer(Configuration config, ActorRef jobmanager, 
ActorRef archive) throws IOException {
+       public WebInfoServer(Configuration config, ActorGateway jobmanager, 
ActorGateway archive) throws IOException {
                if (config == null) {
                        throw new IllegalArgumentException("No Configuration 
has been passed to the web server");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7bf4447..5c0f468 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1128,17 +1128,27 @@ object JobManager {
           "TaskManager_Process_Reaper")
       }
 
-      // start the job manager web frontend
-      if 
(configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, 
false)) {
-        LOG.info("Starting NEW JobManger web frontend")
-        
-        // start the new web frontend. we need to load this dynamically
-        // because it is not in the same project/dependencies
-        startWebRuntimeMonitor(configuration, jobManager, archiver)
-      }
-      else if 
(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
-        LOG.info("Starting JobManger web frontend")
-        val webServer = new WebInfoServer(configuration, jobManager, archiver)
+      if(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) 
>= 0) {
+        val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
+        val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, 
lookupTimeout)
+        val archiverGateway = new AkkaActorGateway(archiver, 
jobManagerGateway.leaderSessionID())
+
+        // start the job manager web frontend
+        val webServer = if (
+          configuration.getBoolean(
+            ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+            false)) {
+
+          LOG.info("Starting NEW JobManger web frontend")
+          // start the new web frontend. we need to load this dynamically
+          // because it is not in the same project/dependencies
+          startWebRuntimeMonitor(configuration, jobManagerGateway, 
archiverGateway)
+        }
+        else {
+          LOG.info("Starting JobManger web frontend")
+          new WebInfoServer(configuration, jobManagerGateway, archiverGateway)
+        }
+
         webServer.start()
       }
     }
@@ -1570,46 +1580,37 @@ object JobManager {
    * this method does not throw any exceptions, but only logs them.
    * 
    * @param config The configuration for the runtime monitor.
-   * @param jobManager The JobManager actor.
+   * @param jobManager The JobManager actor gateway.
    * @param archiver The execution graph archive actor.
    */
-  def startWebRuntimeMonitor(config: Configuration,
-                             jobManager: ActorRef,
-                             archiver: ActorRef): Unit = {
+  def startWebRuntimeMonitor(
+      config: Configuration,
+      jobManager: ActorGateway,
+      archiver: ActorGateway)
+    : WebMonitor = {
     // try to load and instantiate the class
-    val monitor: WebMonitor =
-      try {
-        val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
-        val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
-                                                 
.asSubclass(classOf[WebMonitor])
-        
-        val ctor: Constructor[_ <: WebMonitor] = 
clazz.getConstructor(classOf[Configuration],
-                                                                      
classOf[ActorRef],
-                                                                      
classOf[ActorRef])
-        ctor.newInstance(config, jobManager, archiver)
-      }
-      catch {
-        case e: ClassNotFoundException =>
-          LOG.error("Could not load web runtime monitor. " +
-              "Probably reason: flink-runtime-web is not in the classpath")
-          LOG.debug("Caught exception", e)
-          null
-        case e: InvocationTargetException =>
-          LOG.error("WebServer could not be created", e.getTargetException())
-          null
-        case t: Throwable =>
-          LOG.error("Failed to instantiate web runtime monitor.", t)
-          null
-      }
-    
-    if (monitor != null) {
-      try {
-        monitor.start()
-      }
-      catch {
-        case e: Exception => 
-          LOG.error("Failed to start web runtime monitor", e)
-      }
+    try {
+      val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
+      val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
+                                               .asSubclass(classOf[WebMonitor])
+
+      val ctor: Constructor[_ <: WebMonitor] = 
clazz.getConstructor(classOf[Configuration],
+                                                                    
classOf[ActorGateway],
+                                                                    
classOf[ActorGateway])
+      ctor.newInstance(config, jobManager, archiver)
+    }
+    catch {
+      case e: ClassNotFoundException =>
+        LOG.error("Could not load web runtime monitor. " +
+            "Probably reason: flink-runtime-web is not in the classpath")
+        LOG.debug("Caught exception", e)
+        null
+      case e: InvocationTargetException =>
+        LOG.error("WebServer could not be created", e.getTargetException())
+        null
+      case t: Throwable =>
+        LOG.error("Failed to instantiate web runtime monitor.", t)
+        null
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 6f810fc..7c57233 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -30,10 +30,12 @@ import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobExecutionException, JobClient,
 SerializedJobExecutionResult}
-import org.apache.flink.runtime.instance.ActorGateway
+import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
@@ -74,7 +76,7 @@ abstract class FlinkMiniCluster(
   val configuration = generateConfiguration(userConfiguration)
 
   var jobManagerActorSystem = startJobManagerActorSystem()
-  var jobManagerActor = startJobManager(jobManagerActorSystem)
+  var (jobManagerActor, webMonitor) = startJobManager(jobManagerActorSystem)
 
   val numTaskManagers = configuration.getInteger(
      ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
@@ -99,7 +101,7 @@ abstract class FlinkMiniCluster(
 
   def generateConfiguration(userConfiguration: Configuration): Configuration
 
-  def startJobManager(system: ActorSystem): ActorRef
+  def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor])
 
   def startTaskManager(index: Int, system: ActorSystem): ActorRef
 
@@ -156,6 +158,10 @@ abstract class FlinkMiniCluster(
   }
 
   def shutdown(): Unit = {
+    webMonitor foreach {
+      _.stop()
+    }
+
     val futures = taskManagerActors map {
         gracefulStop(_, timeout)
     }
@@ -183,6 +189,44 @@ abstract class FlinkMiniCluster(
     }
   }
 
+  def startWebServer(
+      config: Configuration,
+      jobManager: ActorRef,
+      archiver: ActorRef)
+    : Option[WebMonitor] = {
+    if(
+      
config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, 
false) &&
+      config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+
+      val lookupTimeout = AkkaUtils.getLookupTimeout(config)
+
+      val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, 
lookupTimeout)
+      val archiverGateway = new AkkaActorGateway(archiver, 
jobManagerGateway.leaderSessionID())
+
+      // start the job manager web frontend
+      val webServer = if (
+        config.getBoolean(
+          ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+          false)) {
+
+        LOG.info("Starting NEW JobManger web frontend")
+        // start the new web frontend. we need to load this dynamically
+        // because it is not in the same project/dependencies
+        JobManager.startWebRuntimeMonitor(config, jobManagerGateway, 
archiverGateway)
+      }
+      else {
+        LOG.info("Starting JobManger web frontend")
+        new WebInfoServer(config, jobManagerGateway, archiverGateway)
+      }
+
+      webServer.start()
+
+      Option(webServer)
+    } else {
+      None
+    }
+  }
+
   def waitForTaskManagersToBeRegistered(): Unit = {
     implicit val executionContext = ExecutionContext.global
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index c056b63..54c457e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -23,12 +23,15 @@ import akka.actor.{ActorRef, ActorSystem, 
ExtendedActorSystem}
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
+import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.webmonitor.WebMonitor
 
 import org.slf4j.LoggerFactory
 
@@ -42,9 +45,10 @@ import org.slf4j.LoggerFactory
  * @param singleActorSystem true if all actors (JobManager and TaskManager) 
shall be run in the same
  *                          [[ActorSystem]], otherwise false
  */
-class LocalFlinkMiniCluster(userConfiguration: Configuration,
-                            singleActorSystem: Boolean,
-                            streamingMode: StreamingMode)
+class LocalFlinkMiniCluster(
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean,
+    streamingMode: StreamingMode)
   extends FlinkMiniCluster(userConfiguration, singleActorSystem, 
streamingMode) {
 
   
@@ -74,23 +78,14 @@ class LocalFlinkMiniCluster(userConfiguration: 
Configuration,
     config
   }
 
-  override def startJobManager(system: ActorSystem): ActorRef = {
+  override def startJobManager(system: ActorSystem): (ActorRef, 
Option[WebMonitor]) = {
     val config = configuration.clone()
        
     val (jobManager, archiver) = JobManager.startJobManagerActors(config, 
system, streamingMode)
-    
-    if 
(config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, 
false)) {
-      if 
(userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, 
false)) {
-        // new web frontend
-        JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, 
archiver)
-      }
-      else {
-        // old web frontend
-        val webServer = new WebInfoServer(configuration, jobManager, archiver)
-        webServer.start()
-      }
-    }
-    jobManager
+
+    val webMonitorOption = startWebServer(config, jobManager, archiver)
+
+    (jobManager, webMonitorOption)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
@@ -125,13 +120,15 @@ class LocalFlinkMiniCluster(userConfiguration: 
Configuration,
       None
     }
     
-    TaskManager.startTaskManagerComponentsAndActor(config, system,
-                                                   hostname, // network 
interface to bind to
-                                                   Some(taskManagerActorName), 
// actor name
-                                                   jobManagerPath, // job 
manager akka URL
-                                                   localExecution, // start 
network stack?
-                                                   streamingMode,
-                                                   classOf[TaskManager])
+    TaskManager.startTaskManagerComponentsAndActor(
+      config,
+      system,
+      hostname, // network interface to bind to
+      Some(taskManagerActorName), // actor name
+      jobManagerPath, // job manager akka URL
+      localExecution, // start network stack?
+      streamingMode,
+      classOf[TaskManager])
   }
 
   def getJobClientActorSystem: ActorSystem = jobClientActorSystem

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f974946..0ec1040 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -171,7 +171,7 @@ class TaskManager(
 
   protected var leaderSessionID: Option[UUID] = None
 
-  private var currentRegistrationSessionID: UUID = UUID.randomUUID()
+  private val currentRegistrationSessionID: UUID = UUID.randomUUID()
 
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index ce0ef8d..f5a506d 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobmanager.{MemoryArchivist, 
JobManager}
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.webmonitor.WebMonitor
 
 /**
  * Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors 
with testing support
@@ -67,7 +68,7 @@ class TestingCluster(userConfiguration: Configuration,
     cfg
   }
 
-  override def startJobManager(actorSystem: ActorSystem): ActorRef = {
+  override def startJobManager(actorSystem: ActorSystem): (ActorRef, 
Option[WebMonitor]) = {
 
     val (executionContext,
       instanceManager,
@@ -103,7 +104,7 @@ class TestingCluster(userConfiguration: Configuration,
       jobManagerProps
     }
 
-    actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME)
+    (actorSystem.actorOf(dispatcherJobManagerProps, 
JobManager.JOB_MANAGER_NAME), None)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem) = {
@@ -116,12 +117,14 @@ class TestingCluster(userConfiguration: Configuration,
       None
     }
     
-    TaskManager.startTaskManagerComponentsAndActor(configuration, system,
-                                                   hostname,
-                                                   Some(tmActorName),
-                                                   jobManagerPath,
-                                                   numTaskManagers == 1,
-                                                   streamingMode,
-                                                   classOf[TestingTaskManager])
+    TaskManager.startTaskManagerComponentsAndActor(
+      configuration,
+      system,
+      hostname,
+      Some(tmActorName),
+      jobManagerPath,
+      numTaskManagers == 1,
+      streamingMode,
+      classOf[TestingTaskManager])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index cdf3960..e83c7a6 100644
--- 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -22,12 +22,15 @@ import akka.actor.{Props, ActorRef, ActorSystem}
 import akka.pattern.Patterns._
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManager,
 TestingMemoryArchivist, TestingTaskManager}
+import org.apache.flink.runtime.webmonitor.WebMonitor
 
 import scala.concurrent.Await
 
@@ -40,9 +43,10 @@ import scala.concurrent.Await
  * @param singleActorSystem true, if all actors (JobManager and TaskManager) 
shall be run in the
  *                          same [[ActorSystem]], otherwise false.
  */
-class ForkableFlinkMiniCluster(userConfiguration: Configuration,
-                               singleActorSystem: Boolean,
-                               streamingMode: StreamingMode)
+class ForkableFlinkMiniCluster(
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean,
+    streamingMode: StreamingMode)
   extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, 
streamingMode) {
   
 
@@ -78,7 +82,7 @@ class ForkableFlinkMiniCluster(userConfiguration: 
Configuration,
     super.generateConfiguration(config)
   }
 
-  override def startJobManager(actorSystem: ActorSystem): ActorRef = {
+  override def startJobManager(actorSystem: ActorSystem): (ActorRef, 
Option[WebMonitor]) = {
 
     val (executionContext,
       instanceManager,
@@ -95,7 +99,7 @@ class ForkableFlinkMiniCluster(userConfiguration: 
Configuration,
         archiveCount)
       with TestingMemoryArchivist)
 
-    val archive = actorSystem.actorOf(testArchiveProps, 
JobManager.ARCHIVE_NAME)
+    val archiver = actorSystem.actorOf(testArchiveProps, 
JobManager.ARCHIVE_NAME)
     
     val jobManagerProps = Props(
       new JobManager(
@@ -104,7 +108,7 @@ class ForkableFlinkMiniCluster(userConfiguration: 
Configuration,
         instanceManager,
         scheduler,
         libraryCacheManager,
-        archive,
+        archiver,
         executionRetries,
         delayBetweenRetries,
         timeout,
@@ -113,21 +117,9 @@ class ForkableFlinkMiniCluster(userConfiguration: 
Configuration,
 
     val jobManager = actorSystem.actorOf(jobManagerProps, 
JobManager.JOB_MANAGER_NAME)
 
-    if (userConfiguration.getBoolean(
-      ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false))
-    {
-      if 
(userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, 
false)) {
-        // new web frontend
-        JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, 
archive)
-      }
-      else {
-        // old web frontend
-        val webServer = new WebInfoServer(configuration, jobManager, archive)
-        webServer.start()
-      }
-    }
+    val webMonitorOption = startWebServer(configuration, jobManager, archiver)
 
-    jobManager
+    (jobManager, webMonitorOption)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
@@ -163,11 +155,18 @@ class ForkableFlinkMiniCluster(userConfiguration: 
Configuration,
     val stopped = gracefulStop(jobManagerActor, TestingUtils.TESTING_DURATION)
     Await.result(stopped, TestingUtils.TESTING_DURATION)
 
+    webMonitor foreach {
+      _.stop()
+    }
+
     jobManagerActorSystem.shutdown()
     jobManagerActorSystem.awaitTermination()
 
     jobManagerActorSystem = startJobManagerActorSystem()
-    jobManagerActor = startJobManager(jobManagerActorSystem)
+    val (newJobManagerActor, newWebMonitor) = 
startJobManager(jobManagerActorSystem)
+
+    jobManagerActor = newJobManagerActor
+    webMonitor = newWebMonitor
   }
 
   def restartTaskManager(index: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index c497a90..9e0c976 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -26,9 +26,11 @@ import org.apache.flink.client.CliFrontend
 import org.apache.flink.configuration.{GlobalConfiguration, Configuration, 
ConfigConstants}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.apache.flink.yarn.Messages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -68,7 +70,7 @@ object ApplicationMaster {
       override def run(): Object = {
 
         var actorSystem: ActorSystem = null
-        var webserver: WebInfoServer = null
+        var webserver: WebMonitor = null
 
         try {
           val conf = new YarnConfiguration()
@@ -99,25 +101,44 @@ object ApplicationMaster {
           val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
           val dynamicPropertiesEncodedString = 
env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
 
-          val (config: Configuration,
-               system: ActorSystem,
-               jobManager: ActorRef,
-               archiver: ActorRef) = startJobManager(currDir, ownHostname,
-                                                     
dynamicPropertiesEncodedString,
-                                                     streamingMode)
+          val config = createConfiguration(currDir, 
dynamicPropertiesEncodedString)
+
+          val (
+            system: ActorSystem,
+            jobManager: ActorRef,
+            archiver: ActorRef) = startJobManager(
+              config,
+              ownHostname,
+              streamingMode)
+
           actorSystem = system
           val extActor = system.asInstanceOf[ExtendedActorSystem]
           val jobManagerPort = extActor.provider.getDefaultAddress.port.get
 
-          // start the web info server
           if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) 
!= -1) {
+            // start the web info server
+            val lookupTimeout = AkkaUtils.getLookupTimeout(config)
+            val jobManagerGateway = 
JobManager.getJobManagerGateway(jobManager, lookupTimeout)
+            val archiverGateway = new AkkaActorGateway(
+              archiver,
+              jobManagerGateway.leaderSessionID())
+
             LOG.info("Starting Job Manger web frontend.")
             config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logDirs)
             config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // 
set port to 0.
             // set JobManager host/port for web interface.
             config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
ownHostname)
             config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerPort)
-            webserver = new WebInfoServer(config, jobManager, archiver)
+
+            webserver = if(
+              config.getBoolean(
+                ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+                false)) {
+              JobManager.startWebRuntimeMonitor(config, jobManagerGateway, 
archiverGateway)
+            } else {
+              new WebInfoServer(config, jobManagerGateway, archiverGateway)
+            }
+
             webserver.start()
           }
 
@@ -160,11 +181,17 @@ object ApplicationMaster {
 
   }
 
-  def generateConfigurationFile(fileName: String, currDir: String, 
ownHostname: String,
-                               jobManagerPort: Int,
-                               jobManagerWebPort: Int, logDirs: String, slots: 
Int,
-                               taskManagerCount: Int, 
dynamicPropertiesEncodedString: String)
-  : Unit = {
+  def generateConfigurationFile(
+      fileName: String,
+      currDir: String,
+      ownHostname: String,
+      jobManagerPort: Int,
+      jobManagerWebPort: Int,
+      logDirs: String,
+      slots: Int,
+      taskManagerCount: Int,
+      dynamicPropertiesEncodedString: String)
+    : Unit = {
     LOG.info("Generate configuration file for application master.")
     val output = new PrintWriter(new BufferedWriter(
       new FileWriter(fileName))
@@ -208,26 +235,13 @@ object ApplicationMaster {
    *
    * @return (Configuration, JobManager ActorSystem, JobManager ActorRef, 
Archiver ActorRef)
    */
-  def startJobManager(currDir: String,
-                      hostname: String,
-                      dynamicPropertiesEncodedString: String,
-                      streamingMode: StreamingMode):
-    (Configuration, ActorSystem, ActorRef, ActorRef) = {
+  def startJobManager(
+      configuration: Configuration,
+      hostname: String,
+      streamingMode: StreamingMode)
+    : (ActorSystem, ActorRef, ActorRef) = {
 
     LOG.info("Starting JobManager for YARN")
-    LOG.info(s"Loading config from: $currDir.")
-
-    GlobalConfiguration.loadConfiguration(currDir)
-    val configuration = GlobalConfiguration.getConfiguration()
-
-    configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir)
-
-    // add dynamic properties to JobManager configuration.
-    val dynamicProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
-    import scala.collection.JavaConverters._
-    for(property <- dynamicProperties.asScala){
-      configuration.setString(property.f0, property.f1)
-    }
 
     // set port to 0 to let Akka automatically determine the port.
     LOG.debug("Starting JobManager actor system")
@@ -265,7 +279,25 @@ object ApplicationMaster {
     LOG.debug("Starting JobManager actor")
     val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)
 
-    (configuration, jobManagerSystem, jobManager, archiver)
+    (jobManagerSystem, jobManager, archiver)
+  }
+
+  def createConfiguration(curDir: String, dynamicPropertiesEncodedString: 
String): Configuration = {
+    LOG.info(s"Loading config from: $curDir.")
+
+    GlobalConfiguration.loadConfiguration(curDir)
+    val configuration = GlobalConfiguration.getConfiguration()
+
+    configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, curDir)
+
+    // add dynamic properties to JobManager configuration.
+    val dynamicProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+    import scala.collection.JavaConverters._
+    for(property <- dynamicProperties.asScala){
+      configuration.setString(property.f0, property.f1)
+    }
+
+    configuration
   }
 
 

Reply via email to