Repository: flink Updated Branches: refs/heads/master 00d5b6222 -> 9f790d3ef
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java new file mode 100644 index 0000000..e608aa0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.retriever.impl; + +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.util.Preconditions; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * JobManagerRetriever implementation for Flip-6 JobManager. + */ +public class RpcJobManagerRetriever extends JobManagerRetriever { + + private final RpcService rpcService; + + public RpcJobManagerRetriever( + RpcService rpcService) { + + this.rpcService = Preconditions.checkNotNull(rpcService); + } + + @Override + protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception { + return rpcService.connect(leaderAddress, JobManagerGateway.class); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index e490b48..1616a7b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -82,6 +82,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskexecutor.TaskExecutor import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util._ +import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{InstantiationUtil, NetUtils, SerializedThrowable} @@ -2219,12 +2220,17 @@ object JobManager { if (configuration.getInteger(WebOptions.PORT, 0) >= 0) { LOG.info("Starting JobManager web frontend") + val timeout = FutureUtils.toTime(AkkaUtils.getTimeout(configuration)) + // start the web frontend. we need to load this dynamically // because it is not in the same project/dependencies val webServer = WebMonitorUtils.startWebRuntimeMonitor( configuration, highAvailabilityServices, - jobManagerSystem) + new AkkaJobManagerRetriever(jobManagerSystem, timeout), + new AkkaQueryServiceRetriever(jobManagerSystem, timeout), + timeout, + jobManagerSystem.dispatcher) Option(webServer) } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index bc323cc..831c026 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -32,7 +32,7 @@ import org.apache.flink.configuration._ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils} import org.apache.flink.runtime.client.{JobClient, JobExecutionException} -import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} @@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware} +import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.util.NetUtils import org.slf4j.LoggerFactory @@ -389,6 +390,8 @@ abstract class FlinkMiniCluster( config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) && config.getInteger(WebOptions.PORT, 0) >= 0) { + val flinkTimeout = FutureUtils.toTime(timeout) + LOG.info("Starting JobManger web frontend") // start the new web frontend. we need to load this dynamically // because it is not in the same project/dependencies @@ -396,7 +399,10 @@ abstract class FlinkMiniCluster( WebMonitorUtils.startWebRuntimeMonitor( config, highAvailabilityServices, - actorSystem) + new AkkaJobManagerRetriever(actorSystem, flinkTimeout), + new AkkaQueryServiceRetriever(actorSystem, flinkTimeout), + flinkTimeout, + actorSystem.dispatcher) ) webServer.foreach(_.start(jobManagerAkkaURL)) http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 858bbbb..ddbb82d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -62,6 +62,8 @@ object TestingUtils { val TESTING_TIMEOUT = 1 minute + val TIMEOUT = Time.minutes(1L) + val DEFAULT_AKKA_ASK_TIMEOUT = "200 s" def getDefaultTestingActorSystemConfigString: String = { http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 88cc585..9130901 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -44,6 +45,8 @@ import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -358,11 +361,16 @@ public class YarnApplicationMasterRunner { // 2: the web monitor LOG.debug("Starting Web Frontend"); + Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT)); + webMonitor = BootstrapTools.startWebMonitorIfConfigured( config, highAvailabilityServices, - actorSystem, - jobManager, + new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout), + new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), + webMonitorTimeout, + futureExecutor, + AkkaUtils.getAkkaURL(actorSystem, jobManager), LOG); String protocol = "http://";
