http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 5af50b2..43a9981 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -45,6 +43,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
@@ -110,6 +109,7 @@ public class RestClient {
 
                bootstrap = new Bootstrap();
                bootstrap
+                       .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
Math.toIntExact(configuration.getConnectionTimeout()))
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .handler(initializer);
@@ -141,18 +141,6 @@ public class RestClient {
                }
        }
 
-       public <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends 
MessageParameters, P extends ResponseBody> CompletableFuture<P> 
sendRequest(String targetAddress, int targetPort, M messageHeaders, U 
messageParameters) throws IOException {
-               return sendRequest(targetAddress, targetPort, messageHeaders, 
messageParameters, EmptyRequestBody.getInstance());
-       }
-
-       public <M extends MessageHeaders<R, P, EmptyMessageParameters>, R 
extends RequestBody, P extends ResponseBody> CompletableFuture<P> 
sendRequest(String targetAddress, int targetPort, M messageHeaders, R request) 
throws IOException {
-               return sendRequest(targetAddress, targetPort, messageHeaders, 
EmptyMessageParameters.getInstance(), request);
-       }
-
-       public <M extends MessageHeaders<EmptyRequestBody, P, 
EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> 
sendRequest(String targetAddress, int targetPort, M messageHeaders) throws 
IOException {
-               return sendRequest(targetAddress, targetPort, messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
-       }
-
        public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> 
sendRequest(String targetAddress, int targetPort, M messageHeaders, U 
messageParameters, R request) throws IOException {
                Preconditions.checkNotNull(targetAddress);
                Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
@@ -264,6 +252,12 @@ public class RestClient {
                        ctx.close();
                }
 
+               @Override
+               public void exceptionCaught(final ChannelHandlerContext ctx, 
final Throwable cause) throws Exception {
+                       jsonFuture.completeExceptionally(cause);
+                       ctx.close();
+               }
+
                private void readRawResponse(FullHttpResponse msg) {
                        ByteBuf content = msg.content();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index 7bf0307..86578a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ConfigurationException;
@@ -36,8 +37,11 @@ public final class RestClientConfiguration {
        @Nullable
        private final SSLEngine sslEngine;
 
-       private RestClientConfiguration(@Nullable SSLEngine sslEngine) {
+       private final long connectionTimeout;
+
+       private RestClientConfiguration(@Nullable SSLEngine sslEngine, final 
long connectionTimeout) {
                this.sslEngine = sslEngine;
+               this.connectionTimeout = connectionTimeout;
        }
 
        /**
@@ -51,6 +55,13 @@ public final class RestClientConfiguration {
        }
 
        /**
+        * @see RestOptions#CONNECTION_TIMEOUT
+        */
+       public long getConnectionTimeout() {
+               return connectionTimeout;
+       }
+
+       /**
         * Creates and returns a new {@link RestClientConfiguration} from the 
given {@link Configuration}.
         *
         * @param config configuration from which the REST client endpoint 
configuration should be created from
@@ -76,6 +87,8 @@ public final class RestClientConfiguration {
                        }
                }
 
-               return new RestClientConfiguration(sslEngine);
+               final long connectionTimeout = 
config.getLong(RestOptions.CONNECTION_TIMEOUT);
+
+               return new RestClientConfiguration(sslEngine, 
connectionTimeout);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 30a68d1..a22e38c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -91,6 +93,7 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHead
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FileUtils;
@@ -104,6 +107,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -112,7 +116,7 @@ import java.util.concurrent.Executor;
  *
  * @param <T> type of the leader gateway
  */
-public class WebMonitorEndpoint<T extends RestfulGateway> extends 
RestServerEndpoint {
+public class WebMonitorEndpoint<T extends RestfulGateway> extends 
RestServerEndpoint implements LeaderContender {
 
        protected final GatewayRetriever<T> leaderRetriever;
        private final Configuration clusterConfiguration;
@@ -125,6 +129,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
 
        private final MetricFetcher<T> metricFetcher;
 
+       private final LeaderElectionService leaderElectionService;
+
+       private final FatalErrorHandler fatalErrorHandler;
+
        public WebMonitorEndpoint(
                        RestServerEndpointConfiguration endpointConfiguration,
                        GatewayRetriever<T> leaderRetriever,
@@ -132,7 +140,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        RestHandlerConfiguration restConfiguration,
                        GatewayRetriever<ResourceManagerGateway> 
resourceManagerRetriever,
                        Executor executor,
-                       MetricQueryServiceRetriever 
metricQueryServiceRetriever) {
+                       MetricQueryServiceRetriever metricQueryServiceRetriever,
+                       LeaderElectionService leaderElectionService,
+                       FatalErrorHandler fatalErrorHandler) {
                super(endpointConfiguration);
                this.leaderRetriever = 
Preconditions.checkNotNull(leaderRetriever);
                this.clusterConfiguration = 
Preconditions.checkNotNull(clusterConfiguration);
@@ -152,6 +162,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        metricQueryServiceRetriever,
                        executor,
                        restConfiguration.getTimeout());
+
+               this.leaderElectionService = 
Preconditions.checkNotNull(leaderElectionService);
+               this.fatalErrorHandler = 
Preconditions.checkNotNull(fatalErrorHandler);
        }
 
        @Override
@@ -468,6 +481,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
        }
 
        @Override
+       public void start() throws Exception {
+               super.start();
+               leaderElectionService.start(this);
+       }
+
+       @Override
        public void shutdown(Time timeout) {
                super.shutdown(timeout);
 
@@ -481,5 +500,37 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                } catch (Throwable t) {
                        log.warn("Error while deleting cache directory {}", 
tmpDir, t);
                }
+
+               try {
+                       leaderElectionService.stop();
+               } catch (Exception e) {
+                       log.warn("Error while stopping leaderElectionService", 
e);
+               }
        }
+
+       
//-------------------------------------------------------------------------
+       // LeaderContender
+       
//-------------------------------------------------------------------------
+
+       @Override
+       public void grantLeadership(final UUID leaderSessionID) {
+               log.info("{} was granted leadership with leaderSessionID={}", 
getRestAddress(), leaderSessionID);
+               leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+       }
+
+       @Override
+       public void revokeLeadership() {
+               log.info("{} lost leadership", getRestAddress());
+       }
+
+       @Override
+       public String getAddress() {
+               return getRestAddress();
+       }
+
+       @Override
+       public void handleError(final Exception exception) {
+               fatalErrorHandler.onFatalError(exception);
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index eb0ce2a..7d14ff2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -31,14 +31,19 @@ import org.mockito.invocation.InvocationOnMock;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -241,4 +246,36 @@ public class FutureUtilsTest extends TestLogger {
                        assertTrue(ExceptionUtils.stripExecutionException(e) 
instanceof TimeoutException);
                }
        }
+
+       @Test
+       public void testRetryWithDelayAndPredicate() throws Exception {
+               final ScheduledExecutorService retryExecutor = 
Executors.newSingleThreadScheduledExecutor();
+               final String retryableExceptionMessage = "first exception";
+               class TestStringSupplier implements 
Supplier<CompletableFuture<String>> {
+                       private final AtomicInteger counter = new 
AtomicInteger();
+
+                       @Override
+                       public CompletableFuture<String> get() {
+                               if (counter.getAndIncrement() == 0) {
+                                       return 
FutureUtils.completedExceptionally(new 
RuntimeException(retryableExceptionMessage));
+                               } else {
+                                       return 
FutureUtils.completedExceptionally(new RuntimeException("should propagate"));
+                               }
+                       }
+               }
+
+               try {
+                       FutureUtils.retryWithDelay(
+                               new TestStringSupplier(),
+                               1,
+                               Time.seconds(0),
+                               throwable ->
+                                       throwable instanceof RuntimeException 
&& throwable.getMessage().contains(retryableExceptionMessage),
+                               new 
ScheduledExecutorServiceAdapter(retryExecutor)).get();
+               } catch (final ExecutionException e) {
+                       assertThat(e.getMessage(), containsString("Could not 
complete the operation"));
+               } finally {
+                       retryExecutor.shutdownNow();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index db0b88e..98d5d74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -40,6 +40,8 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
        private volatile LeaderRetrievalService dispatcherLeaderRetriever;
 
+       private volatile LeaderRetrievalService 
webMonitorEndpointLeaderRetriever;
+
        private ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
 
        private ConcurrentHashMap<JobID, LeaderElectionService> 
jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
@@ -48,6 +50,8 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
        private volatile LeaderElectionService dispatcherLeaderElectionService;
 
+       private volatile LeaderElectionService 
webMonitorEndpointLeaderElectionService;
+
        private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
 
        private volatile SubmittedJobGraphStore submittedJobGraphStore;
@@ -66,6 +70,10 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
                this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
        }
 
+       public void setWebMonitorEndpointLeaderRetriever(final 
LeaderRetrievalService webMonitorEndpointLeaderRetriever) {
+               this.webMonitorEndpointLeaderRetriever = 
webMonitorEndpointLeaderRetriever;
+       }
+
        public void setJobMasterLeaderRetriever(JobID jobID, 
LeaderRetrievalService jobMasterLeaderRetriever) {
                this.jobMasterLeaderRetrievers.put(jobID, 
jobMasterLeaderRetriever);
        }
@@ -82,6 +90,10 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
                this.dispatcherLeaderElectionService = leaderElectionService;
        }
 
+       public void setWebMonitorEndpointLeaderElectionService(final 
LeaderElectionService webMonitorEndpointLeaderElectionService) {
+               this.webMonitorEndpointLeaderElectionService = 
webMonitorEndpointLeaderElectionService;
+       }
+
        public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
                this.checkpointRecoveryFactory = checkpointRecoveryFactory;
        }
@@ -130,6 +142,11 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
+       public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+               return webMonitorEndpointLeaderRetriever;
+       }
+
+       @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() {
                LeaderElectionService service = 
resourceManagerLeaderElectionService;
 
@@ -163,6 +180,11 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
+       public LeaderElectionService getWebMonitorLeaderElectionService() {
+               return webMonitorEndpointLeaderElectionService;
+       }
+
+       @Override
        public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
                CheckpointRecoveryFactory factory = checkpointRecoveryFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
index 1f319eb..512f0d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
@@ -47,10 +47,13 @@ public class TestingManualHighAvailabilityServices 
implements HighAvailabilitySe
 
        private final ManualLeaderService dispatcherLeaderService;
 
+       private final ManualLeaderService webMonitorEndpointLeaderService;
+
        public TestingManualHighAvailabilityServices() {
                jobManagerLeaderServices = new HashMap<>(4);
                resourceManagerLeaderService = new ManualLeaderService();
                dispatcherLeaderService = new ManualLeaderService();
+               webMonitorEndpointLeaderService = new ManualLeaderService();
        }
 
        @Override
@@ -76,6 +79,11 @@ public class TestingManualHighAvailabilityServices 
implements HighAvailabilitySe
        }
 
        @Override
+       public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+               return 
webMonitorEndpointLeaderService.createLeaderRetrievalService();
+       }
+
+       @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() {
                return 
resourceManagerLeaderService.createLeaderElectionService();
        }
@@ -93,6 +101,11 @@ public class TestingManualHighAvailabilityServices 
implements HighAvailabilitySe
        }
 
        @Override
+       public LeaderElectionService getWebMonitorLeaderElectionService() {
+               return 
webMonitorEndpointLeaderService.createLeaderElectionService();
+       }
+
+       @Override
        public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
                return new StandaloneCheckpointRecoveryFactory();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
index 1cf2e5b..c93dbcc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
@@ -41,6 +41,7 @@ public class StandaloneHaServicesTest extends TestLogger {
        private final String jobManagerAddress = "jobManager";
        private final String dispatcherAddress = "dispatcher";
        private final String resourceManagerAddress = "resourceManager";
+       private final String webMonitorAddress = "webMonitor";
 
        private StandaloneHaServices standaloneHaServices;
 
@@ -50,7 +51,8 @@ public class StandaloneHaServicesTest extends TestLogger {
                standaloneHaServices = new StandaloneHaServices(
                        resourceManagerAddress,
                        dispatcherAddress,
-                       jobManagerAddress);
+                       jobManagerAddress,
+                       webMonitorAddress);
        }
 
        @After

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
new file mode 100644
index 0000000..eb77af1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link RestClient}.
+ */
+public class RestClientTest extends TestLogger {
+
+       @Test
+       public void testConnectionTimeout() throws Exception {
+               final Configuration config = new Configuration();
+               config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
+               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
Executors.directExecutor());
+               final String unroutableIp = "10.255.255.1";
+               try {
+                       restClient.sendRequest(
+                               unroutableIp,
+                               80,
+                               new TestMessageHeaders(),
+                               EmptyMessageParameters.getInstance(),
+                               EmptyRequestBody.getInstance())
+                               .get(60, TimeUnit.SECONDS);
+               } catch (final ExecutionException e) {
+                       final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
+                       assertThat(throwable, 
instanceOf(ConnectTimeoutException.class));
+                       assertThat(throwable.getMessage(), 
containsString(unroutableIp));
+               }
+       }
+
+       private static class TestMessageHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+               @Override
+               public Class<EmptyRequestBody> getRequestClass() {
+                       return EmptyRequestBody.class;
+               }
+
+               @Override
+               public Class<EmptyResponseBody> getResponseClass() {
+                       return EmptyResponseBody.class;
+               }
+
+               @Override
+               public HttpResponseStatus getResponseStatusCode() {
+                       return HttpResponseStatus.OK;
+               }
+
+               @Override
+               public EmptyMessageParameters getUnresolvedMessageParameters() {
+                       return EmptyMessageParameters.getInstance();
+               }
+
+               @Override
+               public HttpMethodWrapper getHttpMethod() {
+                       return HttpMethodWrapper.GET;
+               }
+
+               @Override
+               public String getTargetRestEndpointURL() {
+                       return "/";
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index 9954ab7..1cdc70a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -80,11 +81,12 @@ public class RestEndpointITCase extends TestLogger {
        private static final Time timeout = Time.seconds(10L);
 
        private RestServerEndpoint serverEndpoint;
-       private RestClient clientEndpoint;
+       private RestClient restClient;
 
        @Before
        public void setup() throws Exception {
                Configuration config = new Configuration();
+               config.setInteger(RestOptions.REST_PORT, 0);
 
                RestServerEndpointConfiguration serverConfig = 
RestServerEndpointConfiguration.fromConfiguration(config);
                RestClientConfiguration clientConfig = 
RestClientConfiguration.fromConfiguration(config);
@@ -101,16 +103,16 @@ public class RestEndpointITCase extends TestLogger {
                        RpcUtils.INF_TIMEOUT);
 
                serverEndpoint = new TestRestServerEndpoint(serverConfig, 
testHandler);
-               clientEndpoint = new TestRestClient(clientConfig);
+               restClient = new TestRestClient(clientConfig);
 
                serverEndpoint.start();
        }
 
        @After
        public void teardown() {
-               if (clientEndpoint != null) {
-                       clientEndpoint.shutdown(timeout);
-                       clientEndpoint = null;
+               if (restClient != null) {
+                       restClient.shutdown(timeout);
+                       restClient = null;
                }
 
                if (serverEndpoint != null) {
@@ -135,7 +137,7 @@ public class RestEndpointITCase extends TestLogger {
                final InetSocketAddress serverAddress = 
serverEndpoint.getServerAddress();
 
                synchronized (TestHandler.LOCK) {
-                       response1 = clientEndpoint.sendRequest(
+                       response1 = restClient.sendRequest(
                                serverAddress.getHostName(),
                                serverAddress.getPort(),
                                new TestHeaders(),
@@ -145,7 +147,7 @@ public class RestEndpointITCase extends TestLogger {
                }
 
                // send second request and verify response
-               CompletableFuture<TestResponse> response2 = 
clientEndpoint.sendRequest(
+               CompletableFuture<TestResponse> response2 = 
restClient.sendRequest(
                        serverAddress.getHostName(),
                        serverAddress.getPort(),
                        new TestHeaders(),
@@ -176,7 +178,7 @@ public class RestEndpointITCase extends TestLogger {
                parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
                ((TestParameters) 
parameters).jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
 
-               CompletableFuture<TestResponse> response = 
clientEndpoint.sendRequest(
+               CompletableFuture<TestResponse> response = 
restClient.sendRequest(
                        serverAddress.getHostName(),
                        serverAddress.getPort(),
                        new TestHeaders(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 0f08512..0c3adae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -522,6 +522,7 @@ public class TaskExecutorTest extends TestLogger {
                final ResourceID resourceManagerResourceId = new 
ResourceID(resourceManagerAddress);
                final String dispatcherAddress = "localhost";
                final String jobManagerAddress = "localhost";
+               final String webMonitorAddress = "localhost";
 
                // register a mock resource manager gateway
                ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
@@ -540,8 +541,9 @@ public class TaskExecutorTest extends TestLogger {
 
                StandaloneHaServices haServices = new StandaloneHaServices(
                        resourceManagerAddress,
-                               dispatcherAddress,
-                       jobManagerAddress);
+                       dispatcherAddress,
+                       jobManagerAddress,
+                       webMonitorAddress);
 
                final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
                final SlotReport slotReport = new SlotReport();

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
index 86db1c4..ce00741 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
@@ -177,6 +177,17 @@ public class YarnIntraNonHaMasterServices extends 
AbstractYarnNonHaServices {
        }
 
        @Override
+       public LeaderElectionService getWebMonitorLeaderElectionService() {
+               enter();
+               try {
+                       throw new UnsupportedOperationException();
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
                enter();
                try {
@@ -198,6 +209,17 @@ public class YarnIntraNonHaMasterServices extends 
AbstractYarnNonHaServices {
                }
        }
 
+       @Override
+       public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+               enter();
+               try {
+                       throw new UnsupportedOperationException();
+               }
+               finally {
+                       exit();
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  shutdown
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
index c1466d2..9d60d89 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -199,6 +199,17 @@ public class YarnPreConfiguredMasterNonHaServices extends 
AbstractYarnNonHaServi
        }
 
        @Override
+       public LeaderElectionService getWebMonitorLeaderElectionService() {
+               enter();
+               try {
+                       throw new UnsupportedOperationException();
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
                enter();
                try {
@@ -218,4 +229,15 @@ public class YarnPreConfiguredMasterNonHaServices extends 
AbstractYarnNonHaServi
                        exit();
                }
        }
+
+       @Override
+       public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+               enter();
+               try {
+                       throw new UnsupportedOperationException();
+               }
+               finally {
+                       exit();
+               }
+       }
 }

Reply via email to