[FLINK-2793] [runtime-web] Rework JobManagerRetriever to avoid race conditions

The JobManagerRetriever sets the new leaderGatewayPortFuture directly in the 
notifyLeaderAddress
method instead of in one of the futures. This avoids race conditions between 
multiple futures
which finish in a different order than they were started. Furthermore, this 
replaces promises
by futures where a promise is not needed.

Add logging statement

Fix WebRuntimeMonitorITCase to use random port and proper state backend

Add ChannelHandler.Sharable to RuntimeMonitorHandler

Remove sanity check from WebInfoServer to let it work on Yarn


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3ad9621
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3ad9621
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3ad9621

Branch: refs/heads/master
Commit: e3ad96211ebcab4317a7bb1ba42dfb1a9302aafd
Parents: 77fc0cc
Author: Till Rohrmann <[email protected]>
Authored: Fri Oct 9 23:33:25 2015 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Oct 20 00:16:53 2015 +0200

----------------------------------------------------------------------
 flink-runtime-web/pom.xml                       |   8 ++
 .../runtime/webmonitor/JobManagerRetriever.java | 110 ++++++++++---------
 .../webmonitor/RuntimeMonitorHandler.java       |  54 +++++----
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  10 +-
 .../files/StaticFileServerHandler.java          |  63 ++++++-----
 .../handlers/HandlerRedirectUtils.java          |   2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  23 +++-
 .../flink/runtime/jobmanager/JobManager.scala   |  21 ++--
 .../ExecutionGraphRestartTest.scala             |  16 ++-
 tools/log4j-travis.properties                   |   1 +
 10 files changed, 181 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 727604f..f2ac818 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -109,6 +109,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
index 7162639..93db280 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -35,9 +36,11 @@ import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -56,17 +59,14 @@ public class JobManagerRetriever implements 
LeaderRetrievalListener {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerRetriever.class);
 
-       private final Object lock = new Object();
+       private final Object waitLock = new Object();
 
        private final WebMonitor webMonitor;
        private final ActorSystem actorSystem;
        private final FiniteDuration lookupTimeout;
        private final FiniteDuration timeout;
 
-       private volatile Tuple2<Promise<ActorGateway>, Promise<Integer>> 
leaderPromise =
-                       new Tuple2<Promise<ActorGateway>, Promise<Integer>>(
-                                       new 
scala.concurrent.impl.Promise.DefaultPromise<ActorGateway>(),
-                                       new 
scala.concurrent.impl.Promise.DefaultPromise<Integer>());
+       private volatile Future<Tuple2<ActorGateway, Integer>> 
leaderGatewayPortFuture;
 
        public JobManagerRetriever(
                        WebMonitor webMonitor,
@@ -81,22 +81,21 @@ public class JobManagerRetriever implements 
LeaderRetrievalListener {
        }
 
        /**
-        * Returns the leading job manager gateway and its web monitor port.
+        * Returns the currently known leading job manager gateway and its web 
monitor port.
         */
        public Option<Tuple2<ActorGateway, Integer>> 
getJobManagerGatewayAndWebPort() throws Exception {
-               Tuple2<Promise<ActorGateway>, Promise<Integer>> promise = 
leaderPromise;
+               if (leaderGatewayPortFuture != null) {
+                       Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture 
= leaderGatewayPortFuture;
 
-               if (!promise._1().isCompleted() || !promise._1().isCompleted()) 
{
-                       return Option.empty();
-               }
-               else {
-                       Promise<ActorGateway> leaderGatewayPromise = 
promise._1();
-                       Promise<Integer> leaderWebPortPromise = promise._2();
+                       if (gatewayPortFuture.isCompleted()) {
+                               Tuple2<ActorGateway, Integer> gatewayPort = 
Await.result(gatewayPortFuture, timeout);
 
-                       ActorGateway leaderGateway = 
Await.result(leaderGatewayPromise.future(), timeout);
-                       int leaderWebPort = 
Await.result(leaderWebPortPromise.future(), timeout);
-
-                       return Option.apply(new Tuple2<>(leaderGateway, 
leaderWebPort));
+                               return Option.apply(gatewayPort);
+                       } else {
+                               return Option.empty();
+                       }
+               } else {
+                       return Option.empty();
                }
        }
 
@@ -104,66 +103,73 @@ public class JobManagerRetriever implements 
LeaderRetrievalListener {
         * Awaits the leading job manager gateway and its web monitor port.
         */
        public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() 
throws Exception {
-               Tuple2<Promise<ActorGateway>, Promise<Integer>> promise = 
leaderPromise;
+               Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
+               Deadline deadline = timeout.fromNow();
 
-               Promise<ActorGateway> leaderGatewayPromise = promise._1();
-               Promise<Integer> leaderWebPortPromise = promise._2();
+               while(!deadline.isOverdue()) {
+                       synchronized (waitLock) {
+                               gatewayPortFuture = leaderGatewayPortFuture;
 
-               ActorGateway leaderGateway = 
Await.result(leaderGatewayPromise.future(), timeout);
-               int leaderWebPort = Await.result(leaderWebPortPromise.future(), 
timeout);
+                               if (gatewayPortFuture != null) {
+                                       break;
+                               }
 
-               return new Tuple2<>(leaderGateway, leaderWebPort);
+                               waitLock.wait(deadline.timeLeft().toMillis());
+                       }
+               }
+
+               if (gatewayPortFuture == null) {
+                       throw new TimeoutException("There is no JobManager 
available.");
+               } else {
+                       return Await.result(gatewayPortFuture, 
deadline.timeLeft());
+               }
        }
 
        @Override
        public void notifyLeaderAddress(final String leaderAddress, final UUID 
leaderSessionID) {
                if (leaderAddress != null && !leaderAddress.equals("")) {
                        try {
-                               final Promise<ActorGateway> gatewayPromise = 
new scala.concurrent.impl.Promise.DefaultPromise<>();
-                               final Promise<Integer> webPortPromise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+                               final Promise<Tuple2<ActorGateway, Integer>> 
leaderGatewayPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
 
-                               final Tuple2<Promise<ActorGateway>, 
Promise<Integer>> newPromise = new Tuple2<>(
-                                               gatewayPromise, webPortPromise);
+                               synchronized (waitLock) {
+                                       leaderGatewayPortFuture = 
leaderGatewayPortPromise.future();
+                                       waitLock.notifyAll();
+                               }
 
-                               LOG.info("Retrieved leader notification 
{}:{}.", leaderAddress, leaderSessionID);
+                               LOG.info("New leader reachable under {}:{}.", 
leaderAddress, leaderSessionID);
 
                                AkkaUtils.getActorRefFuture(leaderAddress, 
actorSystem, lookupTimeout)
                                                // Resolve the actor ref
-                                               .flatMap(new Mapper<ActorRef, 
Future<Object>>() {
+                                               .flatMap(new Mapper<ActorRef, 
Future<Tuple2<ActorGateway, Object>>>() {
                                                        @Override
-                                                       public Future<Object> 
apply(ActorRef jobManagerRef) {
+                                                       public 
Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) {
                                                                ActorGateway 
leaderGateway = new AkkaActorGateway(
                                                                                
jobManagerRef, leaderSessionID);
 
-                                                               
gatewayPromise.success(leaderGateway);
+                                                               Future<Object> 
webMonitorPort = leaderGateway.ask(
+                                                                       
JobManagerMessages.getRequestWebMonitorPort(),
+                                                                       
timeout);
 
-                                                               return 
leaderGateway.ask(JobManagerMessages
-                                                                               
.getRequestWebMonitorPort(), timeout);
+                                                               return 
Futures.successful(leaderGateway).zip(webMonitorPort);
                                                        }
                                                }, actorSystem.dispatcher())
                                                                // Request the 
web monitor port
-                                               .onComplete(new 
OnComplete<Object>() {
+                                               .onComplete(new 
OnComplete<Tuple2<ActorGateway, Object>>() {
                                                        @Override
-                                                       public void 
onComplete(Throwable failure, Object success) throws Throwable {
+                                                       public void 
onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws 
Throwable {
                                                                if (failure == 
null) {
-                                                                       int 
webMonitorPort = ((ResponseWebMonitorPort) success).port();
-                                                                       
webPortPromise.success(webMonitorPort);
-
-                                                                       // 
Complete the promise
-                                                                       
synchronized (lock) {
-                                                                               
Tuple2<Promise<ActorGateway>, Promise<Integer>>
-                                                                               
                previousPromise = leaderPromise;
-
-                                                                               
leaderPromise = newPromise;
-
-                                                                               
if (!previousPromise._2().isCompleted()) {
-                                                                               
        previousPromise._1().completeWith(gatewayPromise.future());
-                                                                               
        previousPromise._2().completeWith(webPortPromise.future());
-                                                                               
}
+                                                                       if 
(success._2() instanceof ResponseWebMonitorPort) {
+                                                                               
int webMonitorPort = ((ResponseWebMonitorPort) success._2()).port();
+
+                                                                               
leaderGatewayPortPromise.success(new Tuple2<>(success._1(), webMonitorPort));
+                                                                       } else {
+                                                                               
leaderGatewayPortPromise.failure(new Exception("Received the message " +
+                                                                               
success._2() + " as response to " + 
JobManagerMessages.getRequestWebMonitorPort() +
+                                                                               
        ". But a message of type " + ResponseWebMonitorPort.class + " was 
expected."));
                                                                        }
-                                                               }
-                                                               else {
-                                                                       
LOG.warn("Failed to retrieve leader gateway and port.");
+                                                               } else {
+                                                                       
LOG.warn("Failed to retrieve leader gateway and port.", failure);
+                                                                       
leaderGatewayPortPromise.failure(failure);
                                                                }
                                                        }
                                                }, actorSystem.dispatcher());

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index b9369ea..e174463 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -35,7 +36,9 @@ import 
org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.ExceptionUtils;
 import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.Promise;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.nio.charset.Charset;
 
@@ -47,6 +50,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  * This handler also deals with setting correct response MIME types and 
returning
  * proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
  */
[email protected]
 public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> 
{
 
        private static final Charset ENCODING = Charset.forName("UTF-8");
@@ -55,7 +59,9 @@ public class RuntimeMonitorHandler extends 
SimpleChannelInboundHandler<Routed> {
 
        private final JobManagerRetriever retriever;
 
-       private final Promise<String> localJobManagerAddressPromise;
+       private final Future<String> localJobManagerAddressFuture;
+
+       private final FiniteDuration timeout;
 
        private final String contentType;
 
@@ -64,35 +70,41 @@ public class RuntimeMonitorHandler extends 
SimpleChannelInboundHandler<Routed> {
        public RuntimeMonitorHandler(
                        RequestHandler handler,
                        JobManagerRetriever retriever,
-                       Promise<String> localJobManagerAddressPromise) {
+                       Future<String> localJobManagerAddressFuture,
+                       FiniteDuration timeout) {
 
                this.handler = checkNotNull(handler);
                this.retriever = checkNotNull(retriever);
-               this.localJobManagerAddressPromise = 
checkNotNull(localJobManagerAddressPromise);
+               this.localJobManagerAddressFuture = 
checkNotNull(localJobManagerAddressFuture);
+               this.timeout = checkNotNull(timeout);
                this.contentType = (handler instanceof 
RequestHandler.JsonResponse) ? "application/json" : "text/plain";
        }
 
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
-               if (localJobManagerAddress == null) {
-                       localJobManagerAddress = 
localJobManagerAddressPromise.future().value().get().get();
-               }
-
-               Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
-
-               if (jobManager.isDefined()) {
-                       String redirectAddress = 
HandlerRedirectUtils.getRedirectAddress(
-                                       localJobManagerAddress, 
jobManager.get());
-
-                       if (redirectAddress != null) {
-                               HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
-                               KeepAliveWrite.flush(ctx, routed.request(), 
redirect);
+               if (localJobManagerAddressFuture.isCompleted()) {
+                       if (localJobManagerAddress == null) {
+                               localJobManagerAddress = 
Await.result(localJobManagerAddressFuture, timeout);
                        }
-                       else {
-                               respondAsLeader(ctx, routed, 
jobManager.get()._1());
+
+                       Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
+
+                       if (jobManager.isDefined()) {
+                               Tuple2<ActorGateway, Integer> gatewayPort = 
jobManager.get();
+                               String redirectAddress = 
HandlerRedirectUtils.getRedirectAddress(
+                                       localJobManagerAddress, gatewayPort);
+
+                               if (redirectAddress != null) {
+                                       HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
+                                       KeepAliveWrite.flush(ctx, 
routed.request(), redirect);
+                               }
+                               else {
+                                       respondAsLeader(ctx, routed, 
gatewayPort._1());
+                               }
+                       } else {
+                               KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
                        }
-               }
-               else {
+               } else {
                        KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 40ab6c1..ec973c7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -115,6 +115,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private final Promise<String> jobManagerAddressPromise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
 
+       private final FiniteDuration timeout;
+
        private Channel serverChannel;
 
        private final File webRootDir;
@@ -174,7 +176,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                        throw new IllegalArgumentException("Web frontend port 
is invalid: " + this.configuredPort);
                }
 
-               FiniteDuration timeout = AkkaUtils.getTimeout(config);
+               timeout = AkkaUtils.getTimeout(config);
                FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
 
                retriever = new JobManagerRetriever(this, actorSystem, 
lookupTimeout, timeout);
@@ -218,10 +220,10 @@ public class WebRuntimeMonitor implements WebMonitor {
                        .GET("/jobs/:jobid/accumulators", handler(new 
JobAccumulatorsHandler(currentGraphs)))
 
                        .GET("/taskmanagers", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
-                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new 
TaskManagersHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
+                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
 
                        // this handler serves all the static contents
-                       .GET("/:*", new StaticFileServerHandler(retriever, 
jobManagerAddressPromise, webRootDir));
+                       .GET("/:*", new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, webRootDir));
 
                synchronized (startupShutdownLock) {
 
@@ -335,6 +337,6 @@ public class WebRuntimeMonitor implements WebMonitor {
        // 
------------------------------------------------------------------------
 
        private RuntimeMonitorHandler handler(RequestHandler handler) {
-               return new RuntimeMonitorHandler(handler, retriever, 
jobManagerAddressPromise);
+               return new RuntimeMonitorHandler(handler, retriever, 
jobManagerAddressPromise.future(), timeout);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 944407e..d46a900 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -52,7 +52,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.Promise;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -108,7 +110,9 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
        /** JobManager retriever */
        private final JobManagerRetriever retriever;
 
-       private final Promise<String> localJobManagerAddressPromise;
+       private final Future<String> localJobManagerAddressFuture;
+
+       private final FiniteDuration timeout;
 
        /** The path in which the static documents are */
        private final File rootPath;
@@ -120,20 +124,23 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
 
        public StaticFileServerHandler(
                        JobManagerRetriever retriever,
-                       Promise<String> localJobManagerAddressPromise,
+                       Future<String> localJobManagerAddressPromise,
+                       FiniteDuration timeout,
                        File rootPath) {
 
-               this(retriever, localJobManagerAddressPromise, rootPath, 
DEFAULT_LOGGER);
+               this(retriever, localJobManagerAddressPromise, timeout, 
rootPath, DEFAULT_LOGGER);
        }
 
        public StaticFileServerHandler(
                        JobManagerRetriever retriever,
-                       Promise<String> localJobManagerAddressPromise,
+                       Future<String> localJobManagerAddressFuture,
+                       FiniteDuration timeout,
                        File rootPath,
                        Logger logger) {
 
                this.retriever = checkNotNull(retriever);
-               this.localJobManagerAddressPromise = 
localJobManagerAddressPromise;
+               this.localJobManagerAddressFuture = 
checkNotNull(localJobManagerAddressFuture);
+               this.timeout = checkNotNull(timeout);
                this.rootPath = checkNotNull(rootPath);
                this.logger = checkNotNull(logger);
        }
@@ -144,41 +151,45 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
 
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
-               if (localJobManagerAddress == null) {
-                       localJobManagerAddress = 
localJobManagerAddressPromise.future().value().get().get();
-               }
+               if (localJobManagerAddressFuture.isCompleted()) {
+                       if (localJobManagerAddress == null) {
+                               localJobManagerAddress = 
Await.result(localJobManagerAddressFuture, timeout);
+                       }
 
-               final HttpRequest request = routed.request();
-               String requestPath = routed.path();
+                       final HttpRequest request = routed.request();
+                       String requestPath = routed.path();
 
-               // make sure we request the "index.html" in case there is a 
directory request
-               if (requestPath.endsWith("/")) {
-                       requestPath = requestPath + "index.html";
-               }
+                       // make sure we request the "index.html" in case there 
is a directory request
+                       if (requestPath.endsWith("/")) {
+                               requestPath = requestPath + "index.html";
+                       }
 
                // in case the files being accessed are logs or stdout files, 
find appropriate paths.
                if (requestPath.equals("/jobmanager/log")) {
                        requestPath = "/" + getFileName(rootPath, 
WebRuntimeMonitor.LOG_FILE_PATTERN);
                } else if (requestPath.equals("/jobmanager/stdout")) {
                        requestPath = "/" + getFileName(rootPath, 
WebRuntimeMonitor.STDOUT_FILE_PATTERN);
-               }
+                       }
 
-               Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
+                       Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
 
-               if (jobManager.isDefined()) {
-                       // Redirect to leader if necessary
-                       String redirectAddress = 
HandlerRedirectUtils.getRedirectAddress(
+                       if (jobManager.isDefined()) {
+                               // Redirect to leader if necessary
+                               String redirectAddress = 
HandlerRedirectUtils.getRedirectAddress(
                                        localJobManagerAddress, 
jobManager.get());
 
-                       if (redirectAddress != null) {
-                               HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, requestPath);
-                               KeepAliveWrite.flush(ctx, routed.request(), 
redirect);
+                               if (redirectAddress != null) {
+                                       HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, requestPath);
+                                       KeepAliveWrite.flush(ctx, 
routed.request(), redirect);
+                               }
+                               else {
+                                       respondAsLeader(ctx, request, 
requestPath);
+                               }
                        }
                        else {
-                               respondAsLeader(ctx, request, requestPath);
+                               KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
                        }
-               }
-               else {
+               } else {
                        KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 887c46e..800c7c0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -67,7 +67,7 @@ public class HandlerRedirectUtils {
                                return redirectAddress;
                        }
                        else {
-                               LOG.warn("Unexpected leader address pattern. 
Cannot extract host.");
+                               LOG.warn("Unexpected leader address pattern {}. 
Cannot extract host.", leaderAddress);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 26f66b0..5167d13 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -27,13 +27,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.powermock.reflect.Whitebox;
 import scala.Some;
 import scala.Tuple2;
@@ -50,7 +55,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class WebRuntimeMonitorITCase {
+public class WebRuntimeMonitorITCase extends TestLogger {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
        private final static FiniteDuration TestTimeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
 
@@ -77,6 +85,7 @@ public class WebRuntimeMonitorITCase {
                        Configuration monitorConfig = new Configuration();
                        
monitorConfig.setString(WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY, 
MAIN_RESOURCES_PATH);
                        
monitorConfig.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, 
true);
+                       
monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 
                        // Needs to match the leader address from the leader 
retrieval service
                        String jobManagerAddress = 
AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
@@ -131,12 +140,12 @@ public class WebRuntimeMonitorITCase {
                List<LeaderRetrievalService> leaderRetrievalServices = new 
ArrayList<>();
 
                try (TestingServer zooKeeper = new TestingServer()) {
-                       final Configuration config = new Configuration();
+                       final Configuration config = 
ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+                               zooKeeper.getConnectString(),
+                               temporaryFolder.getRoot().getPath());
                        
config.setString(WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY, 
MAIN_RESOURCES_PATH);
                        
config.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
                        
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-                       config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
-                       config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
zooKeeper.getConnectString());
 
                        for (int i = 0; i < jobManagerSystem.length; i++) {
                                jobManagerSystem[i] = 
AkkaUtils.createActorSystem(new Configuration(),
@@ -157,7 +166,11 @@ public class WebRuntimeMonitorITCase {
                                                webMonitor[i].getServerPort());
 
                                jobManager[i] = 
JobManager.startJobManagerActors(
-                                               jmConfig, jobManagerSystem[i], 
StreamingMode.STREAMING)._1();
+                                       jmConfig,
+                                       jobManagerSystem[i],
+                                       StreamingMode.STREAMING,
+                                       JobManager.class,
+                                       MemoryArchivist.class)._1();
 
                                jobManagerAddress[i] = 
AkkaUtils.getAkkaURL(jobManagerSystem[i], jobManager[i]);
                                webMonitor[i].start(jobManagerAddress[i]);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 161e8de..ebc0ea9 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1624,7 +1624,6 @@ object JobManager {
         monitor =>
           val jobManagerAkkaUrl = 
JobManager.getRemoteJobManagerAkkaURL(configuration)
           monitor.start(jobManagerAkkaUrl)
-
         LOG.info("Starting JobManger web frontend")
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
@@ -1632,11 +1631,9 @@ object JobManager {
           configuration,
           leaderRetrievalService,
           jobManagerSystem)
-        Option(webServer)
-      } else {
-        None
       }
 
+
       (jobManagerSystem, jobManager, archive, webMonitor)
     }
     catch {
@@ -1930,14 +1927,14 @@ object JobManager {
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
-                             configuration: Configuration,
-                             actorSystem: ActorSystem,
-                             jobMangerActorName: Option[String],
-                             archiveActorName: Option[String],
-                             streamingMode: StreamingMode,
-                             jobManagerClass: Class[_ <: JobManager],
-                             archiveClass: Class[_ <: MemoryArchivist])
-  : (ActorRef, ActorRef) = {
+      configuration: Configuration,
+      actorSystem: ActorSystem,
+      jobMangerActorName: Option[String],
+      archiveActorName: Option[String],
+      streamingMode: StreamingMode,
+      jobManagerClass: Class[_ <: JobManager],
+      archiveClass: Class[_ <: MemoryArchivist])
+    : (ActorRef, ActorRef) = {
 
     val (executionContext,
     instanceManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index 9a1cde0..e41d7ff 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -144,15 +144,19 @@ class ExecutionGraphRestartTest extends WordSpecLike with 
Matchers {
         // Wait for deploying after async restart
         deadline = timeout.fromNow
         while (deadline.hasTimeLeft() && 
eg.getAllExecutionVertices.asScala.exists(
-          _.getCurrentExecutionAttempt.getState != ExecutionState.DEPLOYING)) {
+          _.getCurrentExecutionAttempt.getAssignedResource == null)) {
           Thread.sleep(100)
         }
-        
-        for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.getCurrentExecutionAttempt().markFinished()
-        }
 
-        eg.getState() should equal(JobStatus.FINISHED)
+        if (deadline.hasTimeLeft()) {
+          for (vertex <- eg.getAllExecutionVertices.asScala) {
+            vertex.getCurrentExecutionAttempt().markFinished()
+          }
+
+          eg.getState() should equal(JobStatus.FINISHED)
+        } else {
+          fail("Failed to wait until all execution attempts left the state 
DEPLOYING.")
+        }
       } catch {
         case t: Throwable =>
           t.printStackTrace()

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 53379b4..d55209e 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -40,6 +40,7 @@ log4j.logger.org.apache.zookeeper=ERROR
 log4j.logger.org.apache.zookeeper.server.quorum.QuorumCnxManager=OFF
 log4j.logger.org.apache.flink.runtime.leaderelection=DEBUG
 log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG
+log4j.logger.org.apache.flink.runtime.executiongraph=DEBUG
 
 # Log a bit when running the flink-yarn-tests to avoid running into the 5 
minutes timeout for
 # the tests

Reply via email to