[FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway This PR decouples the WebRuntimeMonitor from the ActorGateway by introducing the JobManagerGateway interface which can have multiple implementations. This is a preliminary step for the integration of the existing WebRuntimeMonitor with the Flip-6 JobMaster.
Add time unit for web.timeout This closes #4492. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f790d3e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f790d3e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f790d3e Branch: refs/heads/master Commit: 9f790d3efe5e8267da05eb97bbe07ca8a0f859fe Parents: 00d5b62 Author: Till Rohrmann <[email protected]> Authored: Wed Aug 2 18:43:00 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Fri Aug 11 13:48:14 2017 +0200 ---------------------------------------------------------------------- docs/ops/config.md | 2 + .../apache/flink/configuration/WebOptions.java | 7 + .../MesosApplicationMasterRunner.java | 13 +- .../webmonitor/ExecutionGraphHolder.java | 59 ++---- .../runtime/webmonitor/JobManagerRetriever.java | 197 ------------------ .../webmonitor/RuntimeMonitorHandler.java | 16 +- .../webmonitor/RuntimeMonitorHandlerBase.java | 36 ++-- .../runtime/webmonitor/WebRuntimeMonitor.java | 74 ++++--- .../files/StaticFileServerHandler.java | 35 ++-- .../AbstractExecutionGraphRequestHandler.java | 22 +- .../handlers/AbstractJsonRequestHandler.java | 10 +- .../handlers/ClusterOverviewHandler.java | 23 +-- .../handlers/CurrentJobIdsHandler.java | 22 +- .../handlers/CurrentJobsOverviewHandler.java | 24 +-- .../handlers/DashboardConfigHandler.java | 4 +- .../handlers/HandlerRedirectUtils.java | 41 ++-- .../handlers/JarAccessDeniedHandler.java | 4 +- .../webmonitor/handlers/JarDeleteHandler.java | 4 +- .../webmonitor/handlers/JarListHandler.java | 4 +- .../webmonitor/handlers/JarPlanHandler.java | 4 +- .../webmonitor/handlers/JarRunHandler.java | 20 +- .../webmonitor/handlers/JarUploadHandler.java | 8 +- .../handlers/JobCancellationHandler.java | 19 +- .../JobCancellationWithSavepointHandlers.java | 124 ++++++------ .../handlers/JobManagerConfigHandler.java | 4 +- .../webmonitor/handlers/JobStoppingHandler.java | 19 +- .../webmonitor/handlers/RequestHandler.java | 9 +- .../handlers/TaskManagerLogHandler.java | 66 +++--- .../handlers/TaskManagersHandler.java | 39 ++-- .../CheckpointStatsDetailsSubtasksHandler.java | 6 +- .../metrics/AbstractMetricsHandler.java | 4 +- .../webmonitor/metrics/MetricFetcher.java | 202 +++++++++---------- .../runtime/webmonitor/WebFrontendITCase.java | 27 +-- .../webmonitor/WebRuntimeMonitorITCase.java | 73 ++++--- .../handlers/ClusterOverviewHandlerTest.java | 8 +- .../handlers/CurrentJobIdsHandlerTest.java | 8 +- .../CurrentJobsOverviewHandlerTest.java | 10 +- .../handlers/HandlerRedirectUtilsTest.java | 39 ++-- .../webmonitor/handlers/JarRunHandlerTest.java | 5 +- .../handlers/JobAccumulatorsHandlerTest.java | 5 +- .../handlers/JobCancellationHandlerTest.java | 4 +- ...obCancellationWithSavepointHandlersTest.java | 82 ++++---- .../handlers/JobConfigHandlerTest.java | 5 +- .../handlers/JobDetailsHandlerTest.java | 5 +- .../handlers/JobExceptionsHandlerTest.java | 5 +- .../webmonitor/handlers/JobPlanHandlerTest.java | 5 +- .../handlers/JobStoppingHandlerTest.java | 7 +- .../JobVertexAccumulatorsHandlerTest.java | 5 +- .../JobVertexBackPressureHandlerTest.java | 2 +- .../handlers/JobVertexDetailsHandlerTest.java | 5 +- .../JobVertexTaskManagersHandlerTest.java | 5 +- ...SubtaskCurrentAttemptDetailsHandlerTest.java | 6 +- ...ExecutionAttemptAccumulatorsHandlerTest.java | 5 +- ...btaskExecutionAttemptDetailsHandlerTest.java | 5 +- .../SubtasksAllAccumulatorsHandlerTest.java | 5 +- .../handlers/SubtasksTimesHandlerTest.java | 5 +- .../handlers/TaskManagerLogHandlerTest.java | 54 ++--- .../handlers/TaskManagersHandlerTest.java | 7 +- .../metrics/AbstractMetricsHandlerTest.java | 26 ++- .../metrics/JobManagerMetricsHandlerTest.java | 20 +- .../metrics/JobMetricsHandlerTest.java | 20 +- .../metrics/JobVertexMetricsHandlerTest.java | 20 +- .../webmonitor/metrics/MetricFetcherTest.java | 111 ++++------ .../metrics/TaskManagerMetricsHandlerTest.java | 20 +- .../runtime/akka/AkkaJobManagerGateway.java | 190 +++++++++++++++-- .../apache/flink/runtime/client/JobClient.java | 6 +- .../clusterframework/BootstrapTools.java | 34 ++-- .../flink/runtime/concurrent/FutureUtils.java | 10 + .../runtime/jobmaster/JobManagerGateway.java | 122 ++++++++++- .../metrics/dump/MetricDumpSerialization.java | 1 + .../runtime/webmonitor/WebMonitorUtils.java | 29 ++- .../retriever/JobManagerRetriever.java | 123 +++++++++++ .../retriever/MetricQueryServiceGateway.java | 36 ++++ .../retriever/MetricQueryServiceRetriever.java | 35 ++++ .../retriever/impl/AkkaJobManagerRetriever.java | 69 +++++++ .../retriever/impl/AkkaQueryServiceGateway.java | 53 +++++ .../impl/AkkaQueryServiceRetriever.java | 51 +++++ .../retriever/impl/RpcJobManagerRetriever.java | 46 +++++ .../flink/runtime/jobmanager/JobManager.scala | 10 +- .../runtime/minicluster/FlinkMiniCluster.scala | 10 +- .../runtime/testingUtils/TestingUtils.scala | 2 + .../flink/yarn/YarnApplicationMasterRunner.java | 12 +- 82 files changed, 1551 insertions(+), 1018 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index c8d5c92..4138b4d 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -389,6 +389,8 @@ These parameters allow for advanced tuning. The default values are sufficient wh - `web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`. +- `web.timeout`: Timeout for asynchronous operation executed by the web frontend in milliseconds (DEFAULT: `10000`, 10 s) + ### File Systems The parameters define the behavior of tasks that create result files. http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java index f499045..3733244 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java @@ -149,6 +149,13 @@ public class WebOptions { .defaultValue(50) .withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples"); + /** + * Timeout for asynchronous operations by the WebRuntimeMonitor in milliseconds. + */ + public static final ConfigOption<Long> TIMEOUT = ConfigOptions + .key("web.timeout") + .defaultValue(10L * 1000L); + private WebOptions() { throw new IllegalAccessError(); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 260b7f3..7891386 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -18,10 +18,12 @@ package org.apache.flink.mesos.runtime.clusterframework; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.mesos.configuration.MesosOptions; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; @@ -52,6 +54,8 @@ import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -319,11 +323,16 @@ public class MesosApplicationMasterRunner { // 2: the web monitor LOG.debug("Starting Web Frontend"); + Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT)); + webMonitor = BootstrapTools.startWebMonitorIfConfigured( config, highAvailabilityServices, - actorSystem, - jobManager, + new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout), + new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), + webMonitorTimeout, + futureExecutor, + AkkaUtils.getAkkaURL(actorSystem, jobManager), LOG); if (webMonitor != null) { final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/"); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java index 75b0475..739b375 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java @@ -19,20 +19,19 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.WeakHashMap; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -48,7 +47,7 @@ public class ExecutionGraphHolder { private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class); - private final FiniteDuration timeout; + private final Time timeout; private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>(); @@ -56,50 +55,36 @@ public class ExecutionGraphHolder { this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT); } - public ExecutionGraphHolder(FiniteDuration timeout) { + public ExecutionGraphHolder(Time timeout) { this.timeout = checkNotNull(timeout); } /** - * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found. + * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or + * {@link Optional#empty()} if it cannot be found. * * @param jid jobID of the execution graph to be retrieved - * @return the retrieved execution graph or null if it is not retrievable + * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph + * @throws Exception if the ExecutionGraph retrieval failed. */ - public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) { + public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception { AccessExecutionGraph cached = cache.get(jid); if (cached != null) { if (cached.getState() == JobStatus.SUSPENDED) { cache.remove(jid); } else { - return cached; + return Optional.of(cached); } } - try { - if (jobManager != null) { - Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout); - Object result = Await.result(future, timeout); - - if (result instanceof JobManagerMessages.JobNotFound) { - return null; - } - else if (result instanceof JobManagerMessages.JobFound) { - AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph(); - cache.put(jid, eg); - return eg; - } - else { - throw new RuntimeException("Unknown response from JobManager / Archive: " + result); - } - } - else { - LOG.warn("No connection to the leading JobManager."); - return null; - } - } - catch (Exception e) { - throw new RuntimeException("Error requesting execution graph", e); - } + CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout); + + Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + return result.map((executionGraph) -> { + cache.put(jid, executionGraph); + + return executionGraph; + }); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java deleted file mode 100644 index 175a4b8..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.ResponseWebMonitorPort; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.UUID; -import java.util.concurrent.TimeoutException; - -import scala.Option; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Retrieves and stores the actor gateway to the current leading JobManager. In case of an error, - * the {@link WebRuntimeMonitor} to which this instance is associated will be stopped. - * - * <p>The job manager gateway only works if the web monitor and the job manager run in the same - * actor system, because many execution graph structures are not serializable. This breaks the nice - * leader retrieval abstraction and we have a special code path in case that another job manager is - * leader (see {@link org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils}. In such a - * case, we get the address of the web monitor of the leading job manager and redirect to it - * (instead of directly communicating with it). - */ -public class JobManagerRetriever implements LeaderRetrievalListener { - - private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class); - - private final Object waitLock = new Object(); - - private final WebMonitor webMonitor; - private final ActorSystem actorSystem; - private final FiniteDuration lookupTimeout; - private final FiniteDuration timeout; - - private volatile Future<Tuple2<ActorGateway, Integer>> leaderGatewayPortFuture; - - public JobManagerRetriever( - WebMonitor webMonitor, - ActorSystem actorSystem, - FiniteDuration lookupTimeout, - FiniteDuration timeout) { - - this.webMonitor = checkNotNull(webMonitor); - this.actorSystem = checkNotNull(actorSystem); - this.lookupTimeout = checkNotNull(lookupTimeout); - this.timeout = checkNotNull(timeout); - } - - /** - * Returns the currently known leading job manager gateway and its web monitor port. - */ - public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception { - if (leaderGatewayPortFuture != null) { - Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = leaderGatewayPortFuture; - - if (gatewayPortFuture.isCompleted()) { - Tuple2<ActorGateway, Integer> gatewayPort = Await.result(gatewayPortFuture, timeout); - - return Option.apply(gatewayPort); - } else { - return Option.empty(); - } - } else { - return Option.empty(); - } - } - - /** - * Awaits the leading job manager gateway and its web monitor port. - */ - public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception { - Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null; - Deadline deadline = timeout.fromNow(); - - while (!deadline.isOverdue()) { - synchronized (waitLock) { - gatewayPortFuture = leaderGatewayPortFuture; - - if (gatewayPortFuture != null) { - break; - } - - waitLock.wait(deadline.timeLeft().toMillis()); - } - } - - if (gatewayPortFuture == null) { - throw new TimeoutException("There is no JobManager available."); - } else { - return Await.result(gatewayPortFuture, deadline.timeLeft()); - } - } - - @Override - public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { - if (leaderAddress != null && !leaderAddress.equals("")) { - try { - final Promise<Tuple2<ActorGateway, Integer>> leaderGatewayPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); - - synchronized (waitLock) { - leaderGatewayPortFuture = leaderGatewayPortPromise.future(); - waitLock.notifyAll(); - } - - LOG.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID); - - AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, lookupTimeout) - // Resolve the actor ref - .flatMap(new Mapper<ActorRef, Future<Tuple2<ActorGateway, Object>>>() { - @Override - public Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) { - ActorGateway leaderGateway = new AkkaActorGateway( - jobManagerRef, leaderSessionID); - - Future<Object> webMonitorPort = leaderGateway.ask( - JobManagerMessages.getRequestWebMonitorPort(), - timeout); - - return Futures.successful(leaderGateway).zip(webMonitorPort); - } - }, actorSystem.dispatcher()) - // Request the web monitor port - .onComplete(new OnComplete<Tuple2<ActorGateway, Object>>() { - @Override - public void onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws Throwable { - if (failure == null) { - if (success._2() instanceof ResponseWebMonitorPort) { - int webMonitorPort = ((ResponseWebMonitorPort) success._2()).port(); - - leaderGatewayPortPromise.success(new Tuple2<>(success._1(), webMonitorPort)); - } else { - leaderGatewayPortPromise.failure(new Exception("Received the message " + - success._2() + " as response to " + JobManagerMessages.getRequestWebMonitorPort() + - ". But a message of type " + ResponseWebMonitorPort.class + " was expected.")); - } - } else { - LOG.warn("Failed to retrieve leader gateway and port.", failure); - leaderGatewayPortPromise.failure(failure); - } - } - }, actorSystem.dispatcher()); - } - catch (Exception e) { - handleError(e); - } - } - } - - @Override - public void handleError(Exception exception) { - LOG.error("Received error from LeaderRetrievalService.", exception); - - try { - // stop associated webMonitor - webMonitor.stop(); - } - catch (Exception e) { - LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index 4777202..35d13dd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.webmonitor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; @@ -43,9 +45,7 @@ import java.net.URLDecoder; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; - -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -72,8 +72,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { WebMonitorConfig cfg, RequestHandler handler, JobManagerRetriever retriever, - Future<String> localJobManagerAddressFuture, - FiniteDuration timeout, + CompletableFuture<String> localJobManagerAddressFuture, + Time timeout, boolean httpsEnabled) { super(retriever, localJobManagerAddressFuture, timeout, httpsEnabled); @@ -87,7 +87,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { } @Override - protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) { + protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) { FullHttpResponse response; try { @@ -106,7 +106,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { queryParams.put(WEB_MONITOR_ADDRESS_KEY, (httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort()); - response = handler.handleRequest(pathParams, queryParams, jobManager); + response = handler.handleRequest(pathParams, queryParams, jobManagerGateway); } catch (NotFoundException e) { // this should result in a 404 error code (not found) http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java index d524632..4cb55f1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.webmonitor; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; @@ -29,11 +31,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; -import scala.Option; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -48,9 +48,9 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand private final JobManagerRetriever retriever; - protected final Future<String> localJobManagerAddressFuture; + protected final CompletableFuture<String> localJobManagerAddressFuture; - protected final FiniteDuration timeout; + protected final Time timeout; /** Whether the web service has https enabled. */ protected final boolean httpsEnabled; @@ -59,8 +59,8 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand public RuntimeMonitorHandlerBase( JobManagerRetriever retriever, - Future<String> localJobManagerAddressFuture, - FiniteDuration timeout, + CompletableFuture<String> localJobManagerAddressFuture, + Time timeout, boolean httpsEnabled) { this.retriever = checkNotNull(retriever); @@ -78,17 +78,17 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand @Override protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { - if (localJobManagerAddressFuture.isCompleted()) { + if (localJobManagerAddressFuture.isDone()) { if (localJobManagerAddress == null) { - localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout); + localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } - Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort(); + Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); - if (jobManager.isDefined()) { - Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get(); + if (optJobManagerGateway.isPresent()) { + JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); String redirectAddress = HandlerRedirectUtils.getRedirectAddress( - localJobManagerAddress, gatewayPort); + localJobManagerAddress, jobManagerGateway, timeout); if (redirectAddress != null) { HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(), @@ -96,7 +96,7 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand KeepAliveWrite.flush(ctx, routed.request(), redirect); } else { - respondAsLeader(ctx, routed, gatewayPort._1()); + respondAsLeader(ctx, routed, jobManagerGateway); } } else { KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse()); @@ -106,5 +106,5 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand } } - protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager); + protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index e27a15f..17f02f0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.WebOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -71,12 +70,14 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router; -import akka.actor.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,16 +86,11 @@ import javax.net.ssl.SSLContext; import java.io.File; import java.io.IOException; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; - import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -108,7 +104,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class WebRuntimeMonitor implements WebMonitor { /** By default, all requests to the JobManager have a timeout of 10 seconds. */ - public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS); + public static final Time DEFAULT_REQUEST_TIMEOUT = Time.seconds(10L); /** Logger for web frontend startup / shutdown messages. */ private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class); @@ -120,14 +116,15 @@ public class WebRuntimeMonitor implements WebMonitor { private final LeaderRetrievalService leaderRetrievalService; - /** LeaderRetrievalListener which stores the currently leading JobManager and its archive. */ + /** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */ private final JobManagerRetriever retriever; private final SSLContext serverSSLContext; - private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); + private final CompletableFuture<String> jobManagerAddressFuture = new CompletableFuture<>(); + + private final Time timeout; - private final FiniteDuration timeout; private final WebFrontendBootstrap netty; private final File webRootDir; @@ -142,7 +139,6 @@ public class WebRuntimeMonitor implements WebMonitor { private AtomicBoolean cleanedUp = new AtomicBoolean(); - private ExecutorService executorService; private MetricFetcher metricFetcher; @@ -150,11 +146,15 @@ public class WebRuntimeMonitor implements WebMonitor { Configuration config, LeaderRetrievalService leaderRetrievalService, BlobView blobView, - ActorSystem actorSystem) throws IOException, InterruptedException { + JobManagerRetriever jobManagerRetriever, + MetricQueryServiceRetriever queryServiceRetriever, + Time timeout, + Executor executor) throws IOException, InterruptedException { this.leaderRetrievalService = checkNotNull(leaderRetrievalService); - this.timeout = AkkaUtils.getTimeout(config); - this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout); + this.retriever = Preconditions.checkNotNull(jobManagerRetriever); + this.timeout = Preconditions.checkNotNull(timeout); + this.cfg = new WebMonitorConfig(config); final String configuredAddress = cfg.getWebFrontendAddress(); @@ -191,7 +191,7 @@ public class WebRuntimeMonitor implements WebMonitor { // - Back pressure stats ---------------------------------------------- - stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000); + stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000); // Back pressure stats tracker config int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL); @@ -209,10 +209,6 @@ public class WebRuntimeMonitor implements WebMonitor { // -------------------------------------------------------------------- - executorService = new ForkJoinPool(); - - ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService); - // Config to enable https access to the web-ui boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config); @@ -226,11 +222,11 @@ public class WebRuntimeMonitor implements WebMonitor { } else { serverSSLContext = null; } - metricFetcher = new MetricFetcher(actorSystem, retriever, context); + metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout); String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY); - JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, context, defaultSavepointDir); + JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir); RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler()); RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler()); @@ -274,8 +270,8 @@ public class WebRuntimeMonitor implements WebMonitor { get(router, new TaskManagerLogHandler( retriever, - context, - jobManagerAddressPromise.future(), + executor, + jobManagerAddressFuture, timeout, TaskManagerLogHandler.FileMode.LOG, config, @@ -284,8 +280,8 @@ public class WebRuntimeMonitor implements WebMonitor { get(router, new TaskManagerLogHandler( retriever, - context, - jobManagerAddressPromise.future(), + executor, + jobManagerAddressFuture, timeout, TaskManagerLogHandler.FileMode.STDOUT, config, @@ -296,27 +292,27 @@ public class WebRuntimeMonitor implements WebMonitor { router // log and stdout .GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") : - new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile, + new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.logFile, enableSSL)) .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : - new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile, + new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile, enableSSL)); get(router, new JobManagerMetricsHandler(metricFetcher)); // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) - get(router, new JobCancellationHandler()); + get(router, new JobCancellationHandler(timeout)); // DELETE is the preferred way of canceling a job (Rest-conform) - delete(router, new JobCancellationHandler()); + delete(router, new JobCancellationHandler(timeout)); get(router, triggerHandler); get(router, inProgressHandler); // stop a job via GET (for proper integration with YARN this has to be performed via GET) - get(router, new JobStoppingHandler()); + get(router, new JobStoppingHandler(timeout)); // DELETE is the preferred way of stopping a job (Rest-conform) - delete(router, new JobStoppingHandler()); + delete(router, new JobStoppingHandler(timeout)); int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries); @@ -351,7 +347,7 @@ public class WebRuntimeMonitor implements WebMonitor { } // this handler serves all the static contents - router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir, + router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, webRootDir, enableSSL)); // add shutdown hook for deleting the directories and remaining temp files on shutdown @@ -387,7 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor { * @return array of all JsonArchivists relevant for the history server */ public static JsonArchivist[] getJsonArchivists() { - JsonArchivist[] archivists = new JsonArchivist[]{ + JsonArchivist[] archivists = { new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(), new JobPlanHandler.JobPlanJsonArchivist(), @@ -418,7 +414,7 @@ public class WebRuntimeMonitor implements WebMonitor { LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort()); synchronized (startupShutdownLock) { - jobManagerAddressPromise.success(jobManagerAkkaUrl); + jobManagerAddressFuture.complete(jobManagerAkkaUrl); leaderRetrievalService.start(retriever); long delay = backPressureStatsTracker.getCleanUpInterval(); @@ -451,8 +447,6 @@ public class WebRuntimeMonitor implements WebMonitor { backPressureStatsTracker.shutDown(); - executorService.shutdownNow(); - cleanup(); } } @@ -522,7 +516,7 @@ public class WebRuntimeMonitor implements WebMonitor { // ------------------------------------------------------------------------ private RuntimeMonitorHandler handler(RequestHandler handler) { - return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressPromise.future(), timeout, + return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressFuture, timeout, serverSSLContext != null); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java index be6928e..15acb00 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java @@ -26,9 +26,10 @@ package org.apache.flink.runtime.webmonitor.files; * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java *****************************************************************************/ -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; @@ -70,13 +71,9 @@ import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar; import java.util.Locale; +import java.util.Optional; import java.util.TimeZone; - -import scala.Option; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; @@ -118,9 +115,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> private final JobManagerRetriever retriever; - private final Future<String> localJobManagerAddressFuture; + private final CompletableFuture<String> localJobManagerAddressFuture; - private final FiniteDuration timeout; + private final Time timeout; /** The path in which the static documents are. */ private final File rootPath; @@ -135,8 +132,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> public StaticFileServerHandler( JobManagerRetriever retriever, - Future<String> localJobManagerAddressPromise, - FiniteDuration timeout, + CompletableFuture<String> localJobManagerAddressPromise, + Time timeout, File rootPath, boolean httpsEnabled) throws IOException { @@ -145,8 +142,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> public StaticFileServerHandler( JobManagerRetriever retriever, - Future<String> localJobManagerAddressFuture, - FiniteDuration timeout, + CompletableFuture<String> localJobManagerAddressFuture, + Time timeout, File rootPath, boolean httpsEnabled, Logger logger) throws IOException { @@ -165,9 +162,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> @Override public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { - if (localJobManagerAddressFuture.isCompleted()) { + if (localJobManagerAddressFuture.isDone()) { if (localJobManagerAddress == null) { - localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout); + localJobManagerAddress = localJobManagerAddressFuture.get(); } final HttpRequest request = routed.request(); @@ -183,12 +180,12 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> requestPath = ""; } - Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort(); + Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); - if (jobManager.isDefined()) { + if (optJobManagerGateway.isPresent()) { // Redirect to leader if necessary String redirectAddress = HandlerRedirectUtils.getRedirectAddress( - localJobManagerAddress, jobManager.get()); + localJobManagerAddress, optJobManagerGateway.get(), timeout); if (redirectAddress != null) { HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse( http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java index d6c17af..89108db 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java @@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.NotFoundException; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import java.util.Map; +import java.util.Optional; /** * Base class for request handlers whose response depends on an ExecutionGraph @@ -35,11 +38,11 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR private final ExecutionGraphHolder executionGraphHolder; public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) { - this.executionGraphHolder = executionGraphHolder; + this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder); } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { String jidString = pathParams.get("jobid"); if (jidString == null) { throw new RuntimeException("JobId parameter missing"); @@ -53,12 +56,17 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage()); } - AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager); - if (eg == null) { - throw new NotFoundException("Could not find job with id " + jid); + final Optional<AccessExecutionGraph> optGraph; + + try { + optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway); + } catch (Exception e) { + throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e); } - return handleRequest(eg, pathParams); + final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.')); + + return handleRequest(graph, pathParams); } public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java index 2b4a45f..266ffb0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -38,8 +38,8 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler { private static final Charset ENCODING = Charset.forName("UTF-8"); @Override - public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { - String result = handleJsonRequest(pathParams, queryParams, jobManager); + public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { + String result = handleJsonRequest(pathParams, queryParams, jobManagerGateway); byte[] bytes = result.getBytes(ENCODING); DefaultFullHttpResponse response = new DefaultFullHttpResponse( @@ -57,7 +57,7 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler { * * @param pathParams The map of REST path parameters, decoded by the router. * @param queryParams The map of query parameters. - * @param jobManager The JobManager actor. + * @param jobManagerGateway to communicate with the JobManager. * * @return The JSON string that is the HTTP response. * @@ -69,6 +69,6 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler { public abstract String handleJsonRequest( Map<String, String> pathParams, Map<String, String> queryParams, - ActorGateway jobManager) throws Exception; + JobManagerGateway jobManagerGateway) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java index 816ef24..4ebc4e7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -27,10 +27,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import java.io.StringWriter; import java.util.Map; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -46,9 +44,9 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler { private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId; - private final FiniteDuration timeout; + private final Time timeout; - public ClusterOverviewHandler(FiniteDuration timeout) { + public ClusterOverviewHandler(Time timeout) { this.timeout = checkNotNull(timeout); } @@ -58,12 +56,13 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { // we need no parameters, get all requests try { - if (jobManager != null) { - Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout); - StatusOverview overview = (StatusOverview) Await.result(future, timeout); + if (jobManagerGateway != null) { + CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout); + + StatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java index 9d0b863..778a300 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java @@ -19,18 +19,16 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; -import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview; import com.fasterxml.jackson.core.JsonGenerator; import java.io.StringWriter; import java.util.Map; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -43,9 +41,9 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler { private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs"; - private final FiniteDuration timeout; + private final Time timeout; - public CurrentJobIdsHandler(FiniteDuration timeout) { + public CurrentJobIdsHandler(Time timeout) { this.timeout = requireNonNull(timeout); } @@ -55,12 +53,12 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { // we need no parameters, get all requests try { - if (jobManager != null) { - Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout); - JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout); + if (jobManagerGateway != null) { + CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout); + JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java index d0518c8..b324426 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -35,10 +35,8 @@ import java.io.StringWriter; import java.util.Collection; import java.util.Collections; import java.util.Map; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -51,13 +49,13 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running"; private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed"; - private final FiniteDuration timeout; + private final Time timeout; private final boolean includeRunningJobs; private final boolean includeFinishedJobs; public CurrentJobsOverviewHandler( - FiniteDuration timeout, + Time timeout, boolean includeRunningJobs, boolean includeFinishedJobs) { @@ -79,13 +77,11 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { try { - if (jobManager != null) { - Future<Object> future = jobManager.ask( - new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout); - - MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout); + if (jobManagerGateway != null) { + CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout); + MultipleJobsDetails result = jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); final long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java index 312c890..fe1d06b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.util.EnvironmentInformation; import com.fasterxml.jackson.core.JsonGenerator; @@ -55,7 +55,7 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { return this.configString; } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java index 9fbafb8..e27d125 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java @@ -18,9 +18,10 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.files.MimeTypes; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; @@ -30,13 +31,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import scala.Tuple2; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -49,35 +45,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class HandlerRedirectUtils { - private static final Logger LOG = LoggerFactory.getLogger(HandlerRedirectUtils.class); - - /** Pattern to extract the host from an remote Akka URL. */ - private static final Pattern LeaderAddressHostPattern = Pattern.compile("^.+@(.+):([0-9]+)/user/.+$"); - public static String getRedirectAddress( String localJobManagerAddress, - Tuple2<ActorGateway, Integer> leader) throws Exception { + JobManagerGateway jobManagerGateway, + Time timeout) throws Exception { - final String leaderAddress = leader._1().path(); - final int webMonitorPort = leader._2(); + final String leaderAddress = jobManagerGateway.getAddress(); final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1); if (!localJobManagerAddress.equals(leaderAddress) && !leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) { // We are not the leader and need to redirect - Matcher matcher = LeaderAddressHostPattern.matcher(leaderAddress); - - if (matcher.matches()) { - String redirectAddress = String.format("%s:%d", matcher.group(1), webMonitorPort); - return redirectAddress; - } - else { - LOG.warn("Unexpected leader address pattern {}. Cannot extract host.", leaderAddress); - } - } + final String hostname = jobManagerGateway.getHostname(); - return null; + final CompletableFuture<Integer> webMonitorPortFuture = jobManagerGateway.requestWebPort(timeout); + final int webMonitorPort = webMonitorPortFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return String.format("%s:%d", hostname, webMonitorPort); + } else { + return null; + } } public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java index 4a21fec..db55169 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import java.util.Map; @@ -42,7 +42,7 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { return ERROR_MESSAGE; } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java index 2572a76..73771bd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import com.fasterxml.jackson.core.JsonGenerator; @@ -46,7 +46,7 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { final String file = pathParams.get("jarid"); try { File[] list = jarDir.listFiles(new FilenameFilter() { http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index 4dd20b1..4f9b188 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler; import com.fasterxml.jackson.core.JsonGenerator; @@ -51,7 +51,7 @@ public class JarListHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { try { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index 1b25e7f..b239160 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import com.fasterxml.jackson.core.JsonGenerator; @@ -45,7 +45,7 @@ public class JarPlanHandler extends JarActionHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { try { JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams); JobGraph graph = getJobGraphAndClassLoader(config).f0; http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 282fea8..12ffa4f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -22,11 +22,11 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaJobManagerGateway; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.core.JsonGenerator; @@ -34,8 +34,6 @@ import java.io.File; import java.io.StringWriter; import java.util.Map; -import scala.concurrent.duration.FiniteDuration; - /** * This handler handles requests to fetch plan for a jar. */ @@ -43,13 +41,13 @@ public class JarRunHandler extends JarActionHandler { static final String JAR_RUN_REST_PATH = "/jars/:jarid/run"; - private final FiniteDuration timeout; + private final Time timeout; private final Configuration clientConfig; - public JarRunHandler(File jarDirectory, FiniteDuration timeout, Configuration clientConfig) { + public JarRunHandler(File jarDirectory, Time timeout, Configuration clientConfig) { super(jarDirectory); - this.timeout = timeout; - this.clientConfig = clientConfig; + this.timeout = Preconditions.checkNotNull(timeout); + this.clientConfig = Preconditions.checkNotNull(clientConfig); } @Override @@ -58,17 +56,17 @@ public class JarRunHandler extends JarActionHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { try { JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams); Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config); try { JobClient.submitJobDetached( - new AkkaJobManagerGateway(jobManager), + jobManagerGateway, clientConfig, graph.f0, - Time.milliseconds(timeout.toMillis()), + timeout, graph.f1); } catch (JobExecutionException e) { throw new ProgramInvocationException("Failed to submit the job to the job manager", e); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java index 745a110..705c321 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import java.io.File; import java.util.Map; @@ -44,9 +44,9 @@ public class JarUploadHandler extends AbstractJsonRequestHandler { @Override public String handleJsonRequest( - Map<String, String> pathParams, - Map<String, String> queryParams, - ActorGateway jobManager) throws Exception { + Map<String, String> pathParams, + Map<String, String> queryParams, + JobManagerGateway jobManagerGateway) throws Exception { String tempFilePath = queryParams.get("filepath"); String filename = queryParams.get("filename"); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java index d9de7d7..513dc08 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java @@ -19,8 +19,9 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import java.util.Map; @@ -33,17 +34,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler { private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel"; private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel"; + private final Time timeout; + + public JobCancellationHandler(Time timeout) { + this.timeout = Preconditions.checkNotNull(timeout); + } + @Override public String[] getPaths() { return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH}; } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { try { - JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); - if (jobManager != null) { - jobManager.tell(new JobManagerMessages.CancelJob(jobid)); + JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); + if (jobManagerGateway != null) { + jobManagerGateway.cancelJob(jobId, timeout); return "{}"; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java index 7dd4a52..9b474aa 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java @@ -19,16 +19,17 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.NotFoundException; +import org.apache.flink.util.FlinkException; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -37,7 +38,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; -import akka.dispatch.OnComplete; import com.fasterxml.jackson.core.JsonGenerator; import javax.annotation.Nullable; @@ -48,10 +48,9 @@ import java.nio.charset.Charset; import java.util.ArrayDeque; import java.util.HashMap; import java.util.Map; - -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -92,16 +91,16 @@ public class JobCancellationWithSavepointHandlers { public JobCancellationWithSavepointHandlers( ExecutionGraphHolder currentGraphs, - ExecutionContext executionContext) { - this(currentGraphs, executionContext, null); + Executor executor) { + this(currentGraphs, executor, null); } public JobCancellationWithSavepointHandlers( ExecutionGraphHolder currentGraphs, - ExecutionContext executionContext, + Executor executor, @Nullable String defaultSavepointDirectory) { - this.triggerHandler = new TriggerHandler(currentGraphs, executionContext); + this.triggerHandler = new TriggerHandler(currentGraphs, executor); this.inProgressHandler = new InProgressHandler(); this.defaultSavepointDirectory = defaultSavepointDirectory; } @@ -127,11 +126,11 @@ public class JobCancellationWithSavepointHandlers { private final ExecutionGraphHolder currentGraphs; /** Execution context for futures. */ - private final ExecutionContext executionContext; + private final Executor executor; - public TriggerHandler(ExecutionGraphHolder currentGraphs, ExecutionContext executionContext) { + public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) { this.currentGraphs = checkNotNull(currentGraphs); - this.executionContext = checkNotNull(executionContext); + this.executor = checkNotNull(executor); } @Override @@ -144,35 +143,40 @@ public class JobCancellationWithSavepointHandlers { public FullHttpResponse handleRequest( Map<String, String> pathParams, Map<String, String> queryParams, - ActorGateway jobManager) throws Exception { + JobManagerGateway jobManagerGateway) throws Exception { try { - if (jobManager != null) { + if (jobManagerGateway != null) { JobID jobId = JobID.fromHexString(pathParams.get("jobid")); + final Optional<AccessExecutionGraph> optGraph; - AccessExecutionGraph graph = currentGraphs.getExecutionGraph(jobId, jobManager); - if (graph == null) { - throw new Exception("Cannot find ExecutionGraph for job."); - } else { - CheckpointCoordinator coord = graph.getCheckpointCoordinator(); - if (coord == null) { - throw new Exception("Cannot find CheckpointCoordinator for job."); - } + try { + optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway); + } catch (Exception e) { + throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e); + } - String targetDirectory = pathParams.get("targetDirectory"); - if (targetDirectory == null) { - if (defaultSavepointDirectory == null) { - throw new IllegalStateException("No savepoint directory configured. " + - "You can either specify a directory when triggering this savepoint or " + - "configure a cluster-wide default via key '" + - CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); - } else { - targetDirectory = defaultSavepointDirectory; - } - } + final AccessExecutionGraph graph = optGraph.orElseThrow( + () -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')); - return handleNewRequest(jobManager, jobId, targetDirectory, coord.getCheckpointTimeout()); + CheckpointCoordinator coord = graph.getCheckpointCoordinator(); + if (coord == null) { + throw new Exception("Cannot find CheckpointCoordinator for job."); } + + String targetDirectory = pathParams.get("targetDirectory"); + if (targetDirectory == null) { + if (defaultSavepointDirectory == null) { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); + } else { + targetDirectory = defaultSavepointDirectory; + } + } + + return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout()); } else { throw new Exception("No connection to the leading JobManager."); } @@ -182,7 +186,7 @@ public class JobCancellationWithSavepointHandlers { } @SuppressWarnings("unchecked") - private FullHttpResponse handleNewRequest(ActorGateway jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException { + private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException { // Check whether a request exists final long requestId; final boolean isNewRequest; @@ -202,35 +206,21 @@ public class JobCancellationWithSavepointHandlers { try { // Trigger cancellation - Object msg = new CancelJobWithSavepoint(jobId, targetDirectory); - Future<Object> cancelFuture = jobManager - .ask(msg, FiniteDuration.apply(checkpointTimeout, "ms")); - - cancelFuture.onComplete(new OnComplete<Object>() { - @Override - public void onComplete(Throwable failure, Object resp) throws Throwable { - synchronized (lock) { - try { - if (resp != null) { - if (resp.getClass() == CancellationSuccess.class) { - String path = ((CancellationSuccess) resp).savepointPath(); - completed.put(requestId, path); - } else if (resp.getClass() == CancellationFailure.class) { - Throwable cause = ((CancellationFailure) resp).cause(); - completed.put(requestId, cause); - } else { - Throwable cause = new IllegalStateException("Unexpected CancellationResponse of type " + resp.getClass()); - completed.put(requestId, cause); - } - } else { - completed.put(requestId, failure); - } - } finally { - inProgress.remove(jobId); + CompletableFuture<String> cancelJobFuture = jobManagerGateway + .cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout)); + + cancelJobFuture.whenCompleteAsync( + (String path, Throwable throwable) -> { + try { + if (throwable != null) { + completed.put(requestId, throwable); + } else { + completed.put(requestId, path); } + } finally { + inProgress.remove(jobId); } - } - }, executionContext); + }, executor); success = true; } finally { @@ -298,9 +288,9 @@ public class JobCancellationWithSavepointHandlers { @Override @SuppressWarnings("unchecked") - public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { try { - if (jobManager != null) { + if (jobManagerGateway != null) { JobID jobId = JobID.fromHexString(pathParams.get("jobid")); long requestId = Long.parseLong(pathParams.get("requestId"));
