Repository: flink Updated Branches: refs/heads/master 416ff589e -> fab61a195
[FLINK-2409] [webserver] Replaces ActorRefs with ActorGateways in the web server to automatically decorate messages with a leader session ID. Refactored MiniCluster to also store a reference to the web server to stop it. Adds support for the new web interface for yarn Fix web server start condition This closes #959. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab61a19 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab61a19 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab61a19 Branch: refs/heads/master Commit: fab61a1954ff1554448e826e1d273689ed520fc3 Parents: 416ff58 Author: Till Rohrmann <[email protected]> Authored: Wed Jul 29 18:03:52 2015 +0200 Committer: Till Rohrmann <[email protected]> Committed: Mon Aug 3 13:47:31 2015 +0200 ---------------------------------------------------------------------- .../webmonitor/ExecutionGraphHolder.java | 14 +-- .../runtime/webmonitor/WebRuntimeMonitor.java | 5 +- .../handlers/RequestJobIdsHandler.java | 14 +-- .../handlers/RequestOverviewHandler.java | 14 +-- .../legacy/JobManagerInfoHandler.java | 55 ++++++----- .../jobmanager/web/JobManagerInfoServlet.java | 52 +++++------ .../jobmanager/web/SetupInfoServlet.java | 18 ++-- .../runtime/jobmanager/web/WebInfoServer.java | 8 +- .../flink/runtime/jobmanager/JobManager.scala | 97 +++++++++---------- .../runtime/minicluster/FlinkMiniCluster.scala | 50 +++++++++- .../minicluster/LocalFlinkMiniCluster.scala | 45 +++++---- .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../runtime/testingUtils/TestingCluster.scala | 21 +++-- .../test/util/ForkableFlinkMiniCluster.scala | 41 ++++---- .../apache/flink/yarn/ApplicationMaster.scala | 98 +++++++++++++------- 15 files changed, 293 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java index 18a548c..a017f3a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java @@ -18,12 +18,9 @@ package org.apache.flink.runtime.webmonitor; -import akka.actor.ActorRef; -import akka.pattern.Patterns; -import akka.util.Timeout; - import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; import scala.concurrent.Await; @@ -42,18 +39,18 @@ import java.util.WeakHashMap; */ public class ExecutionGraphHolder { - private final ActorRef source; + private final ActorGateway source; private final FiniteDuration timeout; private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>(); - public ExecutionGraphHolder(ActorRef source) { + public ExecutionGraphHolder(ActorGateway source) { this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT); } - public ExecutionGraphHolder(ActorRef source, FiniteDuration timeout) { + public ExecutionGraphHolder(ActorGateway source, FiniteDuration timeout) { if (source == null || timeout == null) { throw new NullPointerException(); } @@ -69,8 +66,7 @@ public class ExecutionGraphHolder { } try { - Timeout to = new Timeout(timeout); - Future<Object> future = Patterns.ask(source, new JobManagerMessages.RequestJob(jid), to); + Future<Object> future = source.ask(new JobManagerMessages.RequestJob(jid), timeout); Object result = Await.result(future, timeout); if (result instanceof JobManagerMessages.JobNotFound) { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index a2095d4..006d18d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.webmonitor; -import akka.actor.ActorRef; - import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -35,6 +33,7 @@ import io.netty.handler.stream.ChunkedWriteHandler; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler; import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler; @@ -88,7 +87,7 @@ public class WebRuntimeMonitor implements WebMonitor { private Channel serverChannel; - public WebRuntimeMonitor(Configuration config, ActorRef jobManager, ActorRef archive) throws IOException { + public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive) throws IOException { // figure out where our static contents is final String configuredWebRoot = config.getString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, null); final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null); http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java index 1f28a01..aa1a39f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java @@ -18,10 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import akka.actor.ActorRef; -import akka.pattern.Patterns; -import akka.util.Timeout; - +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview; import org.apache.flink.runtime.webmonitor.JsonFactory; @@ -40,15 +37,15 @@ import java.util.Map; */ public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse { - private final ActorRef target; + private final ActorGateway target; private final FiniteDuration timeout; - public RequestJobIdsHandler(ActorRef target) { + public RequestJobIdsHandler(ActorGateway target) { this(target, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT); } - public RequestJobIdsHandler(ActorRef target, FiniteDuration timeout) { + public RequestJobIdsHandler(ActorGateway target, FiniteDuration timeout) { if (target == null || timeout == null) { throw new NullPointerException(); } @@ -60,8 +57,7 @@ public class RequestJobIdsHandler implements RequestHandler, RequestHandler.Json public String handleRequest(Map<String, String> params) throws Exception { // we need no parameters, get all requests try { - Timeout to = new Timeout(timeout); - Future<Object> future = Patterns.ask(target, RequestJobsWithIDsOverview.getInstance(), to); + Future<Object> future = target.ask(RequestJobsWithIDsOverview.getInstance(), timeout); JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout); return JsonFactory.generateJobsOverviewJSON(result); } http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java index e51a4d1..c2c00c7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java @@ -18,10 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import akka.actor.ActorRef; -import akka.pattern.Patterns; -import akka.util.Timeout; - +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview; import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview; import org.apache.flink.runtime.webmonitor.JsonFactory; @@ -39,16 +36,16 @@ import java.util.Map; */ public class RequestOverviewHandler implements RequestHandler, RequestHandler.JsonResponse { - private final ActorRef jobManager; + private final ActorGateway jobManager; private final FiniteDuration timeout; - public RequestOverviewHandler(ActorRef jobManager) { + public RequestOverviewHandler(ActorGateway jobManager) { this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT); } - public RequestOverviewHandler(ActorRef jobManager, FiniteDuration timeout) { + public RequestOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) { if (jobManager == null || timeout == null) { throw new NullPointerException(); } @@ -59,8 +56,7 @@ public class RequestOverviewHandler implements RequestHandler, RequestHandler.J @Override public String handleRequest(Map<String, String> params) throws Exception { try { - Timeout to = new Timeout(timeout); - Future<Object> future = Patterns.ask(jobManager, RequestStatusWithJobIDsOverview.getInstance(), to); + Future<Object> future = jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(), timeout); StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout); return JsonFactory.generateOverviewWithJobIDsJSON(result); } http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java index 0a1e08c..9b52736 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java @@ -18,10 +18,6 @@ package org.apache.flink.runtime.webmonitor.legacy; -import akka.actor.ActorRef; - -import akka.pattern.Patterns; -import akka.util.Timeout; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -41,6 +37,7 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -78,12 +75,12 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { private static final Charset ENCODING = Charset.forName("UTF-8"); /** Underlying JobManager */ - private final ActorRef jobmanager; - private final ActorRef archive; + private final ActorGateway jobmanager; + private final ActorGateway archive; private final FiniteDuration timeout; - public JobManagerInfoHandler(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) { + public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) { this.jobmanager = jobmanager; this.archive = archive; this.timeout = timeout; @@ -118,8 +115,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { @SuppressWarnings("unchecked") private String handleRequest(Routed routed) throws Exception { if ("archive".equals(routed.queryParam("get"))) { - Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(), - new Timeout(timeout)); + Future<Object> response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout); Object result = Await.result(response, timeout); @@ -135,8 +131,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { } } else if ("jobcounts".equals(routed.queryParam("get"))) { - Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(), - new Timeout(timeout)); + Future<Object> response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout); Object result = Await.result(response, timeout); @@ -152,8 +147,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { else if ("job".equals(routed.queryParam("get"))) { String jobId = routed.queryParam("job"); - Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)), - new Timeout(timeout)); + Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)), + timeout); Object result = Await.result(response, timeout); @@ -182,8 +177,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { throw new Exception("Found null groupVertexId"); } - Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)), - new Timeout(timeout)); + Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)), + timeout); Object result = Await.result(response, timeout); @@ -205,9 +200,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { } } else if ("taskmanagers".equals(routed.queryParam("get"))) { - Future<Object> response = Patterns.ask(jobmanager, + Future<Object> response = jobmanager.ask( JobManagerMessages.getRequestNumberRegisteredTaskManager(), - new Timeout(timeout)); + timeout); Object result = Await.result(response, timeout); @@ -219,9 +214,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { else { final int numberOfTaskManagers = (Integer)result; - final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager, + final Future<Object> responseRegisteredSlots = jobmanager.ask( JobManagerMessages.getRequestTotalNumberOfSlots(), - new Timeout(timeout)); + timeout); final Object resultRegisteredSlots = Await.result(responseRegisteredSlots, timeout); @@ -242,8 +237,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { else if ("cancel".equals(routed.queryParam("get"))) { String jobId = routed.queryParam("job"); - Future<Object> response = Patterns.ask(jobmanager, new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)), - new Timeout(timeout)); + Future<Object> response = jobmanager.ask(new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)), + timeout); Await.ready(response, timeout); return "{}"; @@ -256,8 +251,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { return writeJsonForVersion(); } else{ - Future<Object> response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(), - new Timeout(timeout)); + Future<Object> response = jobmanager.ask(JobManagerMessages.getRequestRunningJobs(), + timeout); Object result = Await.result(response, timeout); @@ -454,8 +449,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { } // write accumulators - final Future<Object> response = Patterns.ask(jobmanager, - new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout)); + final Future<Object> response = jobmanager.ask( + new RequestAccumulatorResultsStringified(graph.getJobID()), + timeout); Object result; try { @@ -549,9 +545,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { private String writeJsonUpdatesForJob(JobID jobId) { - final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager, + final Future<Object> responseArchivedJobs = jobmanager.ask( JobManagerMessages.getRequestRunningJobs(), - new Timeout(timeout)); + timeout); Object resultArchivedJobs; try{ @@ -591,8 +587,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> { } bld.append("],"); - final Future<Object> responseJob = Patterns.ask(jobmanager, new JobManagerMessages.RequestJob(jobId), - new Timeout(timeout)); + final Future<Object> responseJob = jobmanager.ask( + new JobManagerMessages.RequestJob(jobId), + timeout); Object resultJob; try{ http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java index 82ab63e..ce57714 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java @@ -32,12 +32,9 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import akka.actor.ActorRef; - -import akka.pattern.Patterns; -import akka.util.Timeout; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs; import org.apache.flink.runtime.messages.ArchiveMessages; @@ -78,12 +75,12 @@ public class JobManagerInfoServlet extends HttpServlet { private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class); /** Underlying JobManager */ - private final ActorRef jobmanager; - private final ActorRef archive; + private final ActorGateway jobmanager; + private final ActorGateway archive; private final FiniteDuration timeout; - public JobManagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) { + public JobManagerInfoServlet(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) { this.jobmanager = jobmanager; this.archive = archive; this.timeout = timeout; @@ -102,8 +99,7 @@ public class JobManagerInfoServlet extends HttpServlet { try { if("archive".equals(req.getParameter("get"))) { - response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(), - new Timeout(timeout)); + response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout); result = Await.result(response, timeout); @@ -119,8 +115,7 @@ public class JobManagerInfoServlet extends HttpServlet { } } else if("jobcounts".equals(req.getParameter("get"))) { - response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(), - new Timeout(timeout)); + response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout); result = Await.result(response, timeout); @@ -135,8 +130,7 @@ public class JobManagerInfoServlet extends HttpServlet { else if("job".equals(req.getParameter("get"))) { String jobId = req.getParameter("job"); - response = Patterns.ask(archive, new RequestJob(JobID.fromHexString(jobId)), - new Timeout(timeout)); + response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout); result = Await.result(response, timeout); @@ -163,8 +157,7 @@ public class JobManagerInfoServlet extends HttpServlet { return; } - response = Patterns.ask(archive, new RequestJob(JobID.fromHexString(jobId)), - new Timeout(timeout)); + response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout); result = Await.result(response, timeout); @@ -186,9 +179,9 @@ public class JobManagerInfoServlet extends HttpServlet { } else if("taskmanagers".equals(req.getParameter("get"))) { - response = Patterns.ask(jobmanager, + response = jobmanager.ask( JobManagerMessages.getRequestNumberRegisteredTaskManager(), - new Timeout(timeout)); + timeout); result = Await.result(response, timeout); @@ -199,9 +192,9 @@ public class JobManagerInfoServlet extends HttpServlet { } else { final int numberOfTaskManagers = (Integer)result; - final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager, + final Future<Object> responseRegisteredSlots = jobmanager.ask( JobManagerMessages.getRequestTotalNumberOfSlots(), - new Timeout(timeout)); + timeout); final Object resultRegisteredSlots = Await.result(responseRegisteredSlots, timeout); @@ -221,8 +214,9 @@ public class JobManagerInfoServlet extends HttpServlet { else if("cancel".equals(req.getParameter("get"))) { String jobId = req.getParameter("job"); - response = Patterns.ask(jobmanager, new CancelJob(JobID.fromHexString(jobId)), - new Timeout(timeout)); + response = jobmanager.ask( + new CancelJob(JobID.fromHexString(jobId)), + timeout); Await.ready(response, timeout); } @@ -233,8 +227,9 @@ public class JobManagerInfoServlet extends HttpServlet { writeJsonForVersion(resp.getWriter()); } else{ - response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(), - new Timeout(timeout)); + response = jobmanager.ask( + JobManagerMessages.getRequestRunningJobs(), + timeout); result = Await.result(response, timeout); @@ -471,8 +466,8 @@ public class JobManagerInfoServlet extends HttpServlet { } // write accumulators - final Future<Object> response = Patterns.ask(jobmanager, - new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout)); + final Future<Object> response = jobmanager.ask( + new RequestAccumulatorResultsStringified(graph.getJobID()), timeout); Object result; try { @@ -575,9 +570,9 @@ public class JobManagerInfoServlet extends HttpServlet { private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) { try { - final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager, + final Future<Object> responseArchivedJobs = jobmanager.ask( JobManagerMessages.getRequestRunningJobs(), - new Timeout(timeout)); + timeout); Object resultArchivedJobs = null; @@ -615,8 +610,7 @@ public class JobManagerInfoServlet extends HttpServlet { wrt.write("],"); - final Future<Object> responseJob = Patterns.ask(jobmanager, new RequestJob(jobId), - new Timeout(timeout)); + final Future<Object> responseJob = jobmanager.ask(new RequestJob(jobId), timeout); Object resultJob = null; http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java index c3df253..1f2bfe0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java @@ -32,10 +32,8 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import akka.actor.ActorRef; -import akka.pattern.Patterns; -import akka.util.Timeout; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; @@ -67,13 +65,13 @@ public class SetupInfoServlet extends HttpServlet { final private Configuration configuration; - final private ActorRef jobmanager; + final private ActorGateway jobmanager; final private FiniteDuration timeout; - public SetupInfoServlet(Configuration conf, ActorRef jm, FiniteDuration timeout) { + public SetupInfoServlet(Configuration conf, ActorGateway jobManager, FiniteDuration timeout) { configuration = conf; - this.jobmanager = jm; + this.jobmanager = jobManager; this.timeout = timeout; } @@ -114,9 +112,9 @@ public class SetupInfoServlet extends HttpServlet { private void writeTaskmanagers(HttpServletResponse resp) throws IOException { - final Future<Object> response = Patterns.ask(jobmanager, + final Future<Object> response = jobmanager.ask( JobManagerMessages.getRequestRegisteredTaskManagers(), - new Timeout(timeout)); + timeout); Object obj = null; @@ -183,9 +181,9 @@ public class SetupInfoServlet extends HttpServlet { StackTrace message = null; Throwable exception = null; - final Future<Object> response = Patterns.ask(jobmanager, + final Future<Object> response = jobmanager.ask( new RequestStackTrace(instanceID), - new Timeout(timeout)); + timeout); try { message = (StackTrace) Await.result(response, timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index a414cf6..4383b65 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -23,12 +23,12 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; -import akka.actor.ActorRef; - import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.webmonitor.WebMonitor; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.server.Server; @@ -45,7 +45,7 @@ import scala.concurrent.duration.FiniteDuration; * This class sets up a web-server that contains a web frontend to display information about running jobs. * It instantiates and configures an embedded jetty server. */ -public class WebInfoServer { +public class WebInfoServer implements WebMonitor { /** Web root dir in the jar */ private static final String WEB_ROOT_DIR = "web-docs-infoserver"; @@ -70,7 +70,7 @@ public class WebInfoServer { * @throws IOException * Thrown, if the server setup failed for an I/O related reason. */ - public WebInfoServer(Configuration config, ActorRef jobmanager, ActorRef archive) throws IOException { + public WebInfoServer(Configuration config, ActorGateway jobmanager, ActorGateway archive) throws IOException { if (config == null) { throw new IllegalArgumentException("No Configuration has been passed to the web server"); } http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 7bf4447..5c0f468 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1128,17 +1128,27 @@ object JobManager { "TaskManager_Process_Reaper") } - // start the job manager web frontend - if (configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) { - LOG.info("Starting NEW JobManger web frontend") - - // start the new web frontend. we need to load this dynamically - // because it is not in the same project/dependencies - startWebRuntimeMonitor(configuration, jobManager, archiver) - } - else if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) { - LOG.info("Starting JobManger web frontend") - val webServer = new WebInfoServer(configuration, jobManager, archiver) + if(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { + val lookupTimeout = AkkaUtils.getLookupTimeout(configuration) + val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout) + val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID()) + + // start the job manager web frontend + val webServer = if ( + configuration.getBoolean( + ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, + false)) { + + LOG.info("Starting NEW JobManger web frontend") + // start the new web frontend. we need to load this dynamically + // because it is not in the same project/dependencies + startWebRuntimeMonitor(configuration, jobManagerGateway, archiverGateway) + } + else { + LOG.info("Starting JobManger web frontend") + new WebInfoServer(configuration, jobManagerGateway, archiverGateway) + } + webServer.start() } } @@ -1570,46 +1580,37 @@ object JobManager { * this method does not throw any exceptions, but only logs them. * * @param config The configuration for the runtime monitor. - * @param jobManager The JobManager actor. + * @param jobManager The JobManager actor gateway. * @param archiver The execution graph archive actor. */ - def startWebRuntimeMonitor(config: Configuration, - jobManager: ActorRef, - archiver: ActorRef): Unit = { + def startWebRuntimeMonitor( + config: Configuration, + jobManager: ActorGateway, + archiver: ActorGateway) + : WebMonitor = { // try to load and instantiate the class - val monitor: WebMonitor = - try { - val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor" - val clazz: Class[_ <: WebMonitor] = Class.forName(classname) - .asSubclass(classOf[WebMonitor]) - - val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration], - classOf[ActorRef], - classOf[ActorRef]) - ctor.newInstance(config, jobManager, archiver) - } - catch { - case e: ClassNotFoundException => - LOG.error("Could not load web runtime monitor. " + - "Probably reason: flink-runtime-web is not in the classpath") - LOG.debug("Caught exception", e) - null - case e: InvocationTargetException => - LOG.error("WebServer could not be created", e.getTargetException()) - null - case t: Throwable => - LOG.error("Failed to instantiate web runtime monitor.", t) - null - } - - if (monitor != null) { - try { - monitor.start() - } - catch { - case e: Exception => - LOG.error("Failed to start web runtime monitor", e) - } + try { + val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor" + val clazz: Class[_ <: WebMonitor] = Class.forName(classname) + .asSubclass(classOf[WebMonitor]) + + val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration], + classOf[ActorGateway], + classOf[ActorGateway]) + ctor.newInstance(config, jobManager, archiver) + } + catch { + case e: ClassNotFoundException => + LOG.error("Could not load web runtime monitor. " + + "Probably reason: flink-runtime-web is not in the classpath") + LOG.debug("Caught exception", e) + null + case e: InvocationTargetException => + LOG.error("WebServer could not be created", e.getTargetException()) + null + case t: Throwable => + LOG.error("Failed to instantiate web runtime monitor.", t) + null } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 6f810fc..7c57233 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -30,10 +30,12 @@ import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult} -import org.apache.flink.runtime.instance.ActorGateway +import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway} import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager +import org.apache.flink.runtime.webmonitor.WebMonitor import org.slf4j.LoggerFactory import scala.concurrent.duration.FiniteDuration @@ -74,7 +76,7 @@ abstract class FlinkMiniCluster( val configuration = generateConfiguration(userConfiguration) var jobManagerActorSystem = startJobManagerActorSystem() - var jobManagerActor = startJobManager(jobManagerActorSystem) + var (jobManagerActor, webMonitor) = startJobManager(jobManagerActorSystem) val numTaskManagers = configuration.getInteger( ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) @@ -99,7 +101,7 @@ abstract class FlinkMiniCluster( def generateConfiguration(userConfiguration: Configuration): Configuration - def startJobManager(system: ActorSystem): ActorRef + def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor]) def startTaskManager(index: Int, system: ActorSystem): ActorRef @@ -156,6 +158,10 @@ abstract class FlinkMiniCluster( } def shutdown(): Unit = { + webMonitor foreach { + _.stop() + } + val futures = taskManagerActors map { gracefulStop(_, timeout) } @@ -183,6 +189,44 @@ abstract class FlinkMiniCluster( } } + def startWebServer( + config: Configuration, + jobManager: ActorRef, + archiver: ActorRef) + : Option[WebMonitor] = { + if( + config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false) && + config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { + + val lookupTimeout = AkkaUtils.getLookupTimeout(config) + + val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout) + val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID()) + + // start the job manager web frontend + val webServer = if ( + config.getBoolean( + ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, + false)) { + + LOG.info("Starting NEW JobManger web frontend") + // start the new web frontend. we need to load this dynamically + // because it is not in the same project/dependencies + JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway) + } + else { + LOG.info("Starting JobManger web frontend") + new WebInfoServer(config, jobManagerGateway, archiverGateway) + } + + webServer.start() + + Option(webServer) + } else { + None + } + } + def waitForTaskManagersToBeRegistered(): Unit = { implicit val executionContext = ExecutionContext.global http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index c056b63..54c457e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -23,12 +23,15 @@ import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.StreamingMode +import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.JobClient +import org.apache.flink.runtime.instance.AkkaActorGateway import org.apache.flink.runtime.io.network.netty.NettyConfig import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.flink.runtime.webmonitor.WebMonitor import org.slf4j.LoggerFactory @@ -42,9 +45,10 @@ import org.slf4j.LoggerFactory * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same * [[ActorSystem]], otherwise false */ -class LocalFlinkMiniCluster(userConfiguration: Configuration, - singleActorSystem: Boolean, - streamingMode: StreamingMode) +class LocalFlinkMiniCluster( + userConfiguration: Configuration, + singleActorSystem: Boolean, + streamingMode: StreamingMode) extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) { @@ -74,23 +78,14 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, config } - override def startJobManager(system: ActorSystem): ActorRef = { + override def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor]) = { val config = configuration.clone() val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode) - - if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) { - if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) { - // new web frontend - JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archiver) - } - else { - // old web frontend - val webServer = new WebInfoServer(configuration, jobManager, archiver) - webServer.start() - } - } - jobManager + + val webMonitorOption = startWebServer(config, jobManager, archiver) + + (jobManager, webMonitorOption) } override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { @@ -125,13 +120,15 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, None } - TaskManager.startTaskManagerComponentsAndActor(config, system, - hostname, // network interface to bind to - Some(taskManagerActorName), // actor name - jobManagerPath, // job manager akka URL - localExecution, // start network stack? - streamingMode, - classOf[TaskManager]) + TaskManager.startTaskManagerComponentsAndActor( + config, + system, + hostname, // network interface to bind to + Some(taskManagerActorName), // actor name + jobManagerPath, // job manager akka URL + localExecution, // start network stack? + streamingMode, + classOf[TaskManager]) } def getJobClientActorSystem: ActorSystem = jobClientActorSystem http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f974946..0ec1040 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -171,7 +171,7 @@ class TaskManager( protected var leaderSessionID: Option[UUID] = None - private var currentRegistrationSessionID: UUID = UUID.randomUUID() + private val currentRegistrationSessionID: UUID = UUID.randomUUID() // -------------------------------------------------------------------------- // Actor messages and life cycle http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index ce0ef8d..f5a506d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager} import org.apache.flink.runtime.minicluster.FlinkMiniCluster import org.apache.flink.runtime.net.NetUtils import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.webmonitor.WebMonitor /** * Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors with testing support @@ -67,7 +68,7 @@ class TestingCluster(userConfiguration: Configuration, cfg } - override def startJobManager(actorSystem: ActorSystem): ActorRef = { + override def startJobManager(actorSystem: ActorSystem): (ActorRef, Option[WebMonitor]) = { val (executionContext, instanceManager, @@ -103,7 +104,7 @@ class TestingCluster(userConfiguration: Configuration, jobManagerProps } - actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME) + (actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME), None) } override def startTaskManager(index: Int, system: ActorSystem) = { @@ -116,12 +117,14 @@ class TestingCluster(userConfiguration: Configuration, None } - TaskManager.startTaskManagerComponentsAndActor(configuration, system, - hostname, - Some(tmActorName), - jobManagerPath, - numTaskManagers == 1, - streamingMode, - classOf[TestingTaskManager]) + TaskManager.startTaskManagerComponentsAndActor( + configuration, + system, + hostname, + Some(tmActorName), + jobManagerPath, + numTaskManagers == 1, + streamingMode, + classOf[TestingTaskManager]) } } http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index cdf3960..e83c7a6 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -22,12 +22,15 @@ import akka.actor.{Props, ActorRef, ActorSystem} import akka.pattern.Patterns._ import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.StreamingMode +import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.instance.AkkaActorGateway import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager} import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManager, TestingMemoryArchivist, TestingTaskManager} +import org.apache.flink.runtime.webmonitor.WebMonitor import scala.concurrent.Await @@ -40,9 +43,10 @@ import scala.concurrent.Await * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the * same [[ActorSystem]], otherwise false. */ -class ForkableFlinkMiniCluster(userConfiguration: Configuration, - singleActorSystem: Boolean, - streamingMode: StreamingMode) +class ForkableFlinkMiniCluster( + userConfiguration: Configuration, + singleActorSystem: Boolean, + streamingMode: StreamingMode) extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) { @@ -78,7 +82,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, super.generateConfiguration(config) } - override def startJobManager(actorSystem: ActorSystem): ActorRef = { + override def startJobManager(actorSystem: ActorSystem): (ActorRef, Option[WebMonitor]) = { val (executionContext, instanceManager, @@ -95,7 +99,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, archiveCount) with TestingMemoryArchivist) - val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME) + val archiver = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME) val jobManagerProps = Props( new JobManager( @@ -104,7 +108,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, instanceManager, scheduler, libraryCacheManager, - archive, + archiver, executionRetries, delayBetweenRetries, timeout, @@ -113,21 +117,9 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) - if (userConfiguration.getBoolean( - ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) - { - if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) { - // new web frontend - JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archive) - } - else { - // old web frontend - val webServer = new WebInfoServer(configuration, jobManager, archive) - webServer.start() - } - } + val webMonitorOption = startWebServer(configuration, jobManager, archiver) - jobManager + (jobManager, webMonitorOption) } override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { @@ -163,11 +155,18 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, val stopped = gracefulStop(jobManagerActor, TestingUtils.TESTING_DURATION) Await.result(stopped, TestingUtils.TESTING_DURATION) + webMonitor foreach { + _.stop() + } + jobManagerActorSystem.shutdown() jobManagerActorSystem.awaitTermination() jobManagerActorSystem = startJobManagerActorSystem() - jobManagerActor = startJobManager(jobManagerActorSystem) + val (newJobManagerActor, newWebMonitor) = startJobManager(jobManagerActorSystem) + + jobManagerActor = newJobManagerActor + webMonitor = newWebMonitor } def restartTaskManager(index: Int): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index c497a90..9e0c976 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -26,9 +26,11 @@ import org.apache.flink.client.CliFrontend import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants} import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.instance.AkkaActorGateway import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.flink.runtime.webmonitor.WebMonitor import org.apache.flink.yarn.Messages.StartYarnSession import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -68,7 +70,7 @@ object ApplicationMaster { override def run(): Object = { var actorSystem: ActorSystem = null - var webserver: WebInfoServer = null + var webserver: WebMonitor = null try { val conf = new YarnConfiguration() @@ -99,25 +101,44 @@ object ApplicationMaster { val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES) - val (config: Configuration, - system: ActorSystem, - jobManager: ActorRef, - archiver: ActorRef) = startJobManager(currDir, ownHostname, - dynamicPropertiesEncodedString, - streamingMode) + val config = createConfiguration(currDir, dynamicPropertiesEncodedString) + + val ( + system: ActorSystem, + jobManager: ActorRef, + archiver: ActorRef) = startJobManager( + config, + ownHostname, + streamingMode) + actorSystem = system val extActor = system.asInstanceOf[ExtendedActorSystem] val jobManagerPort = extActor.provider.getDefaultAddress.port.get - // start the web info server if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) { + // start the web info server + val lookupTimeout = AkkaUtils.getLookupTimeout(config) + val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout) + val archiverGateway = new AkkaActorGateway( + archiver, + jobManagerGateway.leaderSessionID()) + LOG.info("Starting Job Manger web frontend.") config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs) config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0. // set JobManager host/port for web interface. config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ownHostname) config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort) - webserver = new WebInfoServer(config, jobManager, archiver) + + webserver = if( + config.getBoolean( + ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, + false)) { + JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway) + } else { + new WebInfoServer(config, jobManagerGateway, archiverGateway) + } + webserver.start() } @@ -160,11 +181,17 @@ object ApplicationMaster { } - def generateConfigurationFile(fileName: String, currDir: String, ownHostname: String, - jobManagerPort: Int, - jobManagerWebPort: Int, logDirs: String, slots: Int, - taskManagerCount: Int, dynamicPropertiesEncodedString: String) - : Unit = { + def generateConfigurationFile( + fileName: String, + currDir: String, + ownHostname: String, + jobManagerPort: Int, + jobManagerWebPort: Int, + logDirs: String, + slots: Int, + taskManagerCount: Int, + dynamicPropertiesEncodedString: String) + : Unit = { LOG.info("Generate configuration file for application master.") val output = new PrintWriter(new BufferedWriter( new FileWriter(fileName)) @@ -208,26 +235,13 @@ object ApplicationMaster { * * @return (Configuration, JobManager ActorSystem, JobManager ActorRef, Archiver ActorRef) */ - def startJobManager(currDir: String, - hostname: String, - dynamicPropertiesEncodedString: String, - streamingMode: StreamingMode): - (Configuration, ActorSystem, ActorRef, ActorRef) = { + def startJobManager( + configuration: Configuration, + hostname: String, + streamingMode: StreamingMode) + : (ActorSystem, ActorRef, ActorRef) = { LOG.info("Starting JobManager for YARN") - LOG.info(s"Loading config from: $currDir.") - - GlobalConfiguration.loadConfiguration(currDir) - val configuration = GlobalConfiguration.getConfiguration() - - configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir) - - // add dynamic properties to JobManager configuration. - val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString) - import scala.collection.JavaConverters._ - for(property <- dynamicProperties.asScala){ - configuration.setString(property.f0, property.f1) - } // set port to 0 to let Akka automatically determine the port. LOG.debug("Starting JobManager actor system") @@ -265,7 +279,25 @@ object ApplicationMaster { LOG.debug("Starting JobManager actor") val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem) - (configuration, jobManagerSystem, jobManager, archiver) + (jobManagerSystem, jobManager, archiver) + } + + def createConfiguration(curDir: String, dynamicPropertiesEncodedString: String): Configuration = { + LOG.info(s"Loading config from: $curDir.") + + GlobalConfiguration.loadConfiguration(curDir) + val configuration = GlobalConfiguration.getConfiguration() + + configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, curDir) + + // add dynamic properties to JobManager configuration. + val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString) + import scala.collection.JavaConverters._ + for(property <- dynamicProperties.asScala){ + configuration.setString(property.f0, property.f1) + } + + configuration }
