Repository: flink
Updated Branches:
  refs/heads/master 00d5b6222 -> 9f790d3ef


http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
new file mode 100644
index 0000000..e608aa0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * JobManagerRetriever implementation for Flip-6 JobManager.
+ */
+public class RpcJobManagerRetriever extends JobManagerRetriever {
+
+       private final RpcService rpcService;
+
+       public RpcJobManagerRetriever(
+               RpcService rpcService) {
+
+               this.rpcService = Preconditions.checkNotNull(rpcService);
+       }
+
+       @Override
+       protected CompletableFuture<JobManagerGateway> 
createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
+               return rpcService.connect(leaderAddress, 
JobManagerGateway.class);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e490b48..1616a7b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import 
org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => 
FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -82,6 +82,7 @@ import 
org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.TaskExecutor
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, 
AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.util.{InstantiationUtil, NetUtils, SerializedThrowable}
@@ -2219,12 +2220,17 @@ object JobManager {
       if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
 
+        val timeout = FutureUtils.toTime(AkkaUtils.getTimeout(configuration))
+
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
         val webServer = WebMonitorUtils.startWebRuntimeMonitor(
           configuration,
           highAvailabilityServices,
-          jobManagerSystem)
+          new AkkaJobManagerRetriever(jobManagerSystem, timeout),
+          new AkkaQueryServiceRetriever(jobManagerSystem, timeout),
+          timeout,
+          jobManagerSystem.dispatcher)
 
         Option(webServer)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bc323cc..831c026 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
-import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => 
FlinkExecutors}
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, 
HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, 
AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.util.NetUtils
 import org.slf4j.LoggerFactory
@@ -389,6 +390,8 @@ abstract class FlinkMiniCluster(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
         config.getInteger(WebOptions.PORT, 0) >= 0) {
 
+      val flinkTimeout = FutureUtils.toTime(timeout)
+
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically
       // because it is not in the same project/dependencies
@@ -396,7 +399,10 @@ abstract class FlinkMiniCluster(
         WebMonitorUtils.startWebRuntimeMonitor(
           config,
           highAvailabilityServices,
-          actorSystem)
+          new AkkaJobManagerRetriever(actorSystem, flinkTimeout),
+          new AkkaQueryServiceRetriever(actorSystem, flinkTimeout),
+          flinkTimeout,
+          actorSystem.dispatcher)
       )
 
       webServer.foreach(_.start(jobManagerAkkaURL))

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 858bbbb..ddbb82d 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -62,6 +62,8 @@ object TestingUtils {
 
   val TESTING_TIMEOUT = 1 minute
 
+  val TIMEOUT = Time.minutes(1L)
+
   val DEFAULT_AKKA_ASK_TIMEOUT = "200 s"
 
   def getDefaultTestingActorSystemConfigString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 88cc585..9130901 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -44,6 +45,8 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -358,11 +361,16 @@ public class YarnApplicationMasterRunner {
                        // 2: the web monitor
                        LOG.debug("Starting Web Frontend");
 
+                       Time webMonitorTimeout = 
Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
                        webMonitor = BootstrapTools.startWebMonitorIfConfigured(
                                config,
                                highAvailabilityServices,
-                               actorSystem,
-                               jobManager,
+                               new AkkaJobManagerRetriever(actorSystem, 
webMonitorTimeout),
+                               new AkkaQueryServiceRetriever(actorSystem, 
webMonitorTimeout),
+                               webMonitorTimeout,
+                               futureExecutor,
+                               AkkaUtils.getAkkaURL(actorSystem, jobManager),
                                LOG);
 
                        String protocol = "http://";;

Reply via email to