[FLINK-8344][flip6] Retrieve leading WebMonitor in RestClusterClient

Make WebMonitorEndpoint instances participate in leader election.
Use leading instance's base url to issue HTTP request from RestClusterClient.
Make polling of JobExecutionResults and savepoints fault tolerant.

[FLINK-8344][flip6] Add TestLogger to unit tests

[FLINK-8344][flip6] Update RestOptions

Declare timeouts and delays as long datatype.
Add descriptions to ConfigOptions.

[FLINK-8344][flip6] Rename methods in RestClusterClient

Rename waitForSavepointCompletion to pollSavepointAsync.
Rename waitForResource to pollResourceAsync.

This closes #5312.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac8225fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac8225fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac8225fd

Branch: refs/heads/master
Commit: ac8225fd56f16b1766724aefbd44babbe322d2ac
Parents: d33aed3
Author: gyao <[email protected]>
Authored: Thu Jan 18 18:17:40 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 13:50:25 2018 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 322 ++++++++++++++-----
 .../rest/RestClusterClientConfiguration.java    |  65 ++--
 .../RestClusterClientConfigurationTest.java     |  52 +++
 .../program/rest/RestClusterClientTest.java     | 140 ++++++--
 .../apache/flink/configuration/RestOptions.java |  44 ++-
 .../org/apache/flink/util/ExceptionUtils.java   |   4 +-
 .../flink/util/function/CheckedSupplier.java    |  39 +++
 .../apache/flink/util/ExceptionUtilsTest.java   |   8 +
 .../flink/docs/rest/RestAPIDocGenerator.java    |  38 ++-
 .../webmonitor/RuntimeMonitorHandler.java       |   2 +-
 .../flink/runtime/concurrent/FutureUtils.java   |  40 ++-
 .../dispatcher/DispatcherRestEndpoint.java      |  22 +-
 .../entrypoint/JobClusterEntrypoint.java        |  11 +-
 .../entrypoint/SessionClusterEntrypoint.java    |  19 +-
 .../HighAvailabilityServices.java               |   4 +
 .../HighAvailabilityServicesUtils.java          |  10 +-
 .../nonha/embedded/EmbeddedHaServices.java      |  16 +
 .../nonha/standalone/StandaloneHaServices.java  |  32 +-
 .../zookeeper/ZooKeeperHaServices.java          |  16 +-
 .../jobmaster/JobMasterRestEndpoint.java        |  20 +-
 .../ZooKeeperLeaderElectionService.java         |   7 +
 .../LeaderRetrievalListener.java                |   4 +-
 .../apache/flink/runtime/rest/RestClient.java   |  22 +-
 .../runtime/rest/RestClientConfiguration.java   |  17 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  55 +++-
 .../runtime/concurrent/FutureUtilsTest.java     |  37 +++
 .../TestingHighAvailabilityServices.java        |  22 ++
 .../TestingManualHighAvailabilityServices.java  |  13 +
 .../standalone/StandaloneHaServicesTest.java    |   4 +-
 .../flink/runtime/rest/RestClientTest.java      | 103 ++++++
 .../flink/runtime/rest/RestEndpointITCase.java  |  18 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   6 +-
 .../YarnIntraNonHaMasterServices.java           |  22 ++
 .../YarnPreConfiguredMasterNonHaServices.java   |  22 ++
 34 files changed, 1053 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 348e647..141af71 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -34,18 +34,25 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
@@ -61,29 +68,42 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerReq
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import 
org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
+
+import akka.actor.AddressFromURIString;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import scala.Option;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
@@ -100,20 +120,48 @@ public class RestClusterClient<T> extends 
ClusterClient<T> {
 
        private final T clusterId;
 
+       private final LeaderRetrievalService webMonitorRetrievalService;
+
+       private final LeaderRetrievalService dispatcherRetrievalService;
+
+       private final LeaderRetriever webMonitorLeaderRetriever = new 
LeaderRetriever();
+
+       private final LeaderRetriever dispatcherLeaderRetriever = new 
LeaderRetriever();
+
+       /** ExecutorService to run operations that can be retried on 
exceptions. */
+       private ScheduledExecutorService retryExecutorService;
+
        public RestClusterClient(Configuration config, T clusterId) throws 
Exception {
                this(
                        config,
+                       null,
                        clusterId,
                        new ExponentialWaitStrategy(10L, 2000L));
        }
 
        @VisibleForTesting
-       RestClusterClient(Configuration configuration, T clusterId, 
WaitStrategy waitStrategy) throws Exception {
+       RestClusterClient(Configuration configuration, @Nullable RestClient 
restClient, T clusterId, WaitStrategy waitStrategy) throws Exception {
                super(configuration);
                this.restClusterClientConfiguration = 
RestClusterClientConfiguration.fromConfiguration(configuration);
-               this.restClient = new 
RestClient(restClusterClientConfiguration.getRestClientConfiguration(), 
executorService);
+
+               if (restClient != null) {
+                       this.restClient = restClient;
+               } else {
+                       this.restClient = new 
RestClient(restClusterClientConfiguration.getRestClientConfiguration(), 
executorService);
+               }
+
                this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
                this.clusterId = Preconditions.checkNotNull(clusterId);
+
+               this.webMonitorRetrievalService = 
highAvailabilityServices.getWebMonitorLeaderRetriever();
+               this.dispatcherRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
+               this.retryExecutorService = 
Executors.newSingleThreadScheduledExecutor(new 
ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
+               startLeaderRetrievers();
+       }
+
+       private void startLeaderRetrievers() throws Exception {
+               
this.webMonitorRetrievalService.start(webMonitorLeaderRetriever);
+               
this.dispatcherRetrievalService.start(dispatcherLeaderRetriever);
        }
 
        @Override
@@ -124,8 +172,23 @@ public class RestClusterClient<T> extends ClusterClient<T> 
{
                } catch (Exception e) {
                        log.error("An error occurred during the client 
shutdown.", e);
                }
+
+               
ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), 
TimeUnit.MILLISECONDS, retryExecutorService);
+
                this.restClient.shutdown(Time.seconds(5));
                ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.executorService);
+
+               try {
+                       webMonitorRetrievalService.stop();
+               } catch (Exception e) {
+                       log.error("An error occurred during stopping the 
webMonitorRetrievalService", e);
+               }
+
+               try {
+                       dispatcherRetrievalService.stop();
+               } catch (Exception e) {
+                       log.error("An error occurred during stopping the 
dispatcherLeaderRetriever", e);
+               }
        }
 
        @Override
@@ -141,16 +204,16 @@ public class RestClusterClient<T> extends 
ClusterClient<T> {
 
                final JobResult jobExecutionResult;
                try {
-                       jobExecutionResult = waitForResource(
+                       jobExecutionResult = pollResourceAsync(
                                () -> {
                                        final JobMessageParameters 
messageParameters = new JobMessageParameters();
                                        
messageParameters.jobPathParameter.resolve(jobGraph.getJobID());
-                                       return restClient.sendRequest(
-                                               
restClusterClientConfiguration.getRestServerAddress(),
-                                               
restClusterClientConfiguration.getRestServerPort(),
+                                       return sendRetryableRequest(
                                                
JobExecutionResultHeaders.getInstance(),
-                                               messageParameters);
-                               });
+                                               messageParameters,
+                                               EmptyRequestBody.getInstance(),
+                                               
isConnectionProblemException().or(isHttpStatusUnsuccessfulException()));
+                               }).get();
                } catch (final Exception e) {
                        throw new ProgramInvocationException(e);
                }
@@ -180,9 +243,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
                log.info("Requesting blob server port.");
                int blobServerPort;
                try {
-                       CompletableFuture<BlobServerPortResponseBody> 
portFuture = restClient.sendRequest(
-                               
restClusterClientConfiguration.getRestServerAddress(),
-                               
restClusterClientConfiguration.getRestServerPort(),
+                       CompletableFuture<BlobServerPortResponseBody> 
portFuture = sendRequest(
                                BlobServerPortHeaders.getInstance());
                        blobServerPort = portFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS).port;
                } catch (Exception e) {
@@ -191,7 +252,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
                log.info("Uploading jar files.");
                try {
-                       InetSocketAddress address = new 
InetSocketAddress(restClusterClientConfiguration.getBlobServerAddress(), 
blobServerPort);
+                       InetSocketAddress address = new 
InetSocketAddress(getDispatcherAddress().get(), blobServerPort);
                        List<PermanentBlobKey> keys = 
BlobClient.uploadJarFiles(address, this.flinkConfig, jobGraph.getJobID(), 
jobGraph.getUserJars());
                        for (PermanentBlobKey key : keys) {
                                jobGraph.addBlob(key);
@@ -202,9 +263,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
                log.info("Submitting job graph.");
                try {
-                       CompletableFuture<JobSubmitResponseBody> responseFuture 
= restClient.sendRequest(
-                               
restClusterClientConfiguration.getRestServerAddress(),
-                               
restClusterClientConfiguration.getRestServerPort(),
+                       CompletableFuture<JobSubmitResponseBody> responseFuture 
= sendRequest(
                                JobSubmitHeaders.getInstance(),
                                new JobSubmitRequestBody(jobGraph));
                        responseFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
@@ -218,9 +277,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
                JobTerminationMessageParameters params = new 
JobTerminationMessageParameters();
                params.jobPathParameter.resolve(jobID);
                
params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP));
-               CompletableFuture<EmptyResponseBody> responseFuture = 
restClient.sendRequest(
-                       restClusterClientConfiguration.getRestServerAddress(),
-                       restClusterClientConfiguration.getRestServerPort(),
+               CompletableFuture<EmptyResponseBody> responseFuture = 
sendRequest(
                        JobTerminationHeaders.getInstance(),
                        params
                );
@@ -232,9 +289,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
                JobTerminationMessageParameters params = new 
JobTerminationMessageParameters();
                params.jobPathParameter.resolve(jobID);
                
params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL));
-               CompletableFuture<EmptyResponseBody> responseFuture = 
restClient.sendRequest(
-                       restClusterClientConfiguration.getRestServerAddress(),
-                       restClusterClientConfiguration.getRestServerPort(),
+               CompletableFuture<EmptyResponseBody> responseFuture = 
sendRequest(
                        JobTerminationHeaders.getInstance(),
                        params
                );
@@ -257,25 +312,15 @@ public class RestClusterClient<T> extends 
ClusterClient<T> {
 
                final CompletableFuture<SavepointTriggerResponseBody> 
responseFuture;
 
-               try {
-                       responseFuture = restClient.sendRequest(
-                               
restClusterClientConfiguration.getRestServerAddress(),
-                               
restClusterClientConfiguration.getRestServerPort(),
-                               savepointTriggerHeaders,
-                               savepointTriggerMessageParameters,
-                               new 
SavepointTriggerRequestBody(savepointDirectory));
-               } catch (IOException e) {
-                       throw new FlinkException("Could not send trigger 
savepoint request to Flink cluster.", e);
-               }
+               responseFuture = sendRequest(
+                       savepointTriggerHeaders,
+                       savepointTriggerMessageParameters,
+                       new SavepointTriggerRequestBody(savepointDirectory));
 
-               return responseFuture.thenApply(savepointTriggerResponseBody -> 
{
+               return responseFuture.thenCompose(savepointTriggerResponseBody 
-> {
                        final SavepointTriggerId savepointTriggerId = 
savepointTriggerResponseBody.getSavepointTriggerId();
-                       final SavepointInfo savepointInfo;
-                       try {
-                               savepointInfo = 
waitForSavepointCompletion(jobId, savepointTriggerId);
-                       } catch (Exception e) {
-                               throw new CompletionException(e);
-                       }
+                       return pollSavepointAsync(jobId, savepointTriggerId);
+               }).thenApply(savepointInfo -> {
                        if (savepointInfo.getFailureCause() != null) {
                                throw new 
CompletionException(savepointInfo.getFailureCause());
                        }
@@ -283,41 +328,36 @@ public class RestClusterClient<T> extends 
ClusterClient<T> {
                });
        }
 
-       private SavepointInfo waitForSavepointCompletion(
+       private CompletableFuture<SavepointInfo> pollSavepointAsync(
                        final JobID jobId,
-                       final SavepointTriggerId savepointTriggerId) throws 
Exception {
-               return waitForResource(() -> {
+                       final SavepointTriggerId savepointTriggerId) {
+               return pollResourceAsync(() -> {
                        final SavepointStatusHeaders savepointStatusHeaders = 
SavepointStatusHeaders.getInstance();
                        final SavepointStatusMessageParameters 
savepointStatusMessageParameters =
                                
savepointStatusHeaders.getUnresolvedMessageParameters();
                        
savepointStatusMessageParameters.jobIdPathParameter.resolve(jobId);
                        
savepointStatusMessageParameters.savepointTriggerIdPathParameter.resolve(savepointTriggerId);
-                       return restClient.sendRequest(
-                               
restClusterClientConfiguration.getRestServerAddress(),
-                               
restClusterClientConfiguration.getRestServerPort(),
+                       return sendRetryableRequest(
                                savepointStatusHeaders,
-                               savepointStatusMessageParameters
-                       );
+                               savepointStatusMessageParameters,
+                               EmptyRequestBody.getInstance(),
+                               isConnectionProblemException());
                });
        }
 
        @Override
        public CompletableFuture<Collection<JobStatusMessage>> listJobs() 
throws Exception {
-               JobsOverviewHeaders headers = JobsOverviewHeaders.getInstance();
-               CompletableFuture<MultipleJobsDetails> jobDetailsFuture = 
restClient.sendRequest(
-                       restClusterClientConfiguration.getRestServerAddress(),
-                       restClusterClientConfiguration.getRestServerPort(),
-                       headers
-               );
-               return jobDetailsFuture
+               return sendRequest(JobsOverviewHeaders.getInstance())
                        .thenApply(
-                               (MultipleJobsDetails multipleJobsDetails) -> {
-                                       final Collection<JobDetails> jobDetails 
= multipleJobsDetails.getJobs();
-                                       Collection<JobStatusMessage> 
flattenedDetails = new ArrayList<>(jobDetails.size());
-                                       jobDetails.forEach(detail -> 
flattenedDetails.add(new JobStatusMessage(detail.getJobId(), 
detail.getJobName(), detail.getStatus(), detail.getStartTime())));
-
-                                       return flattenedDetails;
-                       });
+                               (multipleJobsDetails) -> multipleJobsDetails
+                                       .getJobs()
+                                       .stream()
+                                       .map(detail -> new JobStatusMessage(
+                                               detail.getJobId(),
+                                               detail.getJobName(),
+                                               detail.getStatus(),
+                                               detail.getStartTime()))
+                                       .collect(Collectors.toList()));
        }
 
        @Override
@@ -325,21 +365,43 @@ public class RestClusterClient<T> extends 
ClusterClient<T> {
                return clusterId;
        }
 
-       private <R, A extends AsynchronouslyCreatedResource<R>> R 
waitForResource(
-                       final SupplierWithException<CompletableFuture<A>, 
IOException> resourceFutureSupplier)
-                               throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
-               A asynchronouslyCreatedResource;
-               long attempt = 0;
-               while (true) {
-                       final CompletableFuture<A> responseFuture = 
resourceFutureSupplier.get();
-                       asynchronouslyCreatedResource = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-                       if (asynchronouslyCreatedResource.queueStatus().getId() 
== QueueStatus.Id.COMPLETED) {
-                               break;
+       /**
+        * Creates a {@code CompletableFuture} that polls a {@code 
AsynchronouslyCreatedResource} until
+        * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} 
becomes
+        * {@link QueueStatus.Id#COMPLETED COMPLETED}. The future completes 
with the result of
+        * {@link AsynchronouslyCreatedResource#resource()}.
+        *
+        * @param resourceFutureSupplier The operation which polls for the
+        *                               {@code AsynchronouslyCreatedResource}.
+        * @param <R>                    The type of the resource.
+        * @param <A>                    The type of the {@code 
AsynchronouslyCreatedResource}.
+        * @return A {@code CompletableFuture} delivering the resource.
+        */
+       private <R, A extends AsynchronouslyCreatedResource<R>> 
CompletableFuture<R> pollResourceAsync(
+                       final Supplier<CompletableFuture<A>> 
resourceFutureSupplier) {
+               return pollResourceAsync(resourceFutureSupplier, new 
CompletableFuture<>(), 0);
+       }
+
+       private <R, A extends AsynchronouslyCreatedResource<R>> 
CompletableFuture<R> pollResourceAsync(
+                       final Supplier<CompletableFuture<A>> 
resourceFutureSupplier,
+                       final CompletableFuture<R> resultFuture,
+                       final long attempt) {
+
+               
resourceFutureSupplier.get().whenComplete((asynchronouslyCreatedResource, 
throwable) -> {
+                       if (throwable != null) {
+                               resultFuture.completeExceptionally(throwable);
+                       } else {
+                               if 
(asynchronouslyCreatedResource.queueStatus().getId() == 
QueueStatus.Id.COMPLETED) {
+                                       
resultFuture.complete(asynchronouslyCreatedResource.resource());
+                               } else {
+                                       retryExecutorService.schedule(() -> {
+                                               
pollResourceAsync(resourceFutureSupplier, resultFuture, attempt + 1);
+                                       }, waitStrategy.sleepTime(attempt), 
TimeUnit.MILLISECONDS);
+                               }
                        }
-                       Thread.sleep(waitStrategy.sleepTime(attempt));
-                       attempt++;
-               }
-               return asynchronouslyCreatedResource.resource();
+               });
+
+               return resultFuture;
        }
 
        // ======================================
@@ -358,7 +420,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
        @Override
        public String getWebInterfaceURL() {
-               return "http://"; + 
restClusterClientConfiguration.getRestServerAddress() + ':' + 
restClusterClientConfiguration.getRestServerPort();
+               return getWebMonitorBaseUrl().toString();
        }
 
        @Override
@@ -375,4 +437,102 @@ public class RestClusterClient<T> extends 
ClusterClient<T> {
        public int getMaxSlots() {
                return 0;
        }
+
+       
//-------------------------------------------------------------------------
+       // RestClient Helper
+       
//-------------------------------------------------------------------------
+
+       private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends 
MessageParameters, P extends ResponseBody> CompletableFuture<P>
+                       sendRequest(M messageHeaders, U messageParameters) {
+               return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
+       }
+
+       private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R 
extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+                       sendRequest(M messageHeaders, R request) {
+               return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
+       }
+
+       private <M extends MessageHeaders<EmptyRequestBody, P, 
EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P>
+                       sendRequest(M messageHeaders) {
+               return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+       }
+
+       private <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
+                       sendRequest(M messageHeaders, U messageParameters, R 
request) {
+               return getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
+                       try {
+                               return 
restClient.sendRequest(webMonitorBaseUrl.getHost(), 
webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+                       } catch (IOException e) {
+                               throw new CompletionException(e);
+                       }
+               });
+       }
+
+       private <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
+                       sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate<Throwable> retryPredicate) {
+               return retry(() -> 
getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
+                       try {
+                               return 
restClient.sendRequest(webMonitorBaseUrl.getHost(), 
webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+                       } catch (IOException e) {
+                               throw new CompletionException(e);
+                       }
+               }), retryPredicate);
+       }
+
+       private <C> CompletableFuture<C> retry(
+                       CheckedSupplier<CompletableFuture<C>> operation,
+                       Predicate<Throwable> retryPredicate) {
+               return FutureUtils.retryWithDelay(
+                       CheckedSupplier.unchecked(operation),
+                       restClusterClientConfiguration.getRetryMaxAttempts(),
+                       
Time.milliseconds(restClusterClientConfiguration.getRetryDelay()),
+                       retryPredicate,
+                       new 
ScheduledExecutorServiceAdapter(retryExecutorService));
+       }
+
+       private static Predicate<Throwable> isConnectionProblemException() {
+               return (throwable) ->
+                       ExceptionUtils.findThrowable(throwable, 
java.net.ConnectException.class).isPresent() ||
+                               ExceptionUtils.findThrowable(throwable, 
java.net.SocketTimeoutException.class).isPresent() ||
+                               ExceptionUtils.findThrowable(throwable, 
ConnectTimeoutException.class).isPresent() ||
+                               ExceptionUtils.findThrowable(throwable, 
IOException.class).isPresent();
+       }
+
+       private static Predicate<Throwable> isHttpStatusUnsuccessfulException() 
{
+               return (throwable) -> ExceptionUtils.findThrowable(throwable, 
RestClientException.class)
+                               .map(restClientException -> {
+                                       final int code = 
restClientException.getHttpResponseStatus().code();
+                                       return code < 200 || code > 299;
+                               })
+                               .orElse(false);
+       }
+
+       private CompletableFuture<URL> getWebMonitorBaseUrl() {
+               return FutureUtils.orTimeout(
+                               webMonitorLeaderRetriever.getLeaderFuture(),
+                               
restClusterClientConfiguration.getAwaitLeaderTimeout(),
+                               TimeUnit.MILLISECONDS)
+                       .thenApplyAsync(leaderAddressSessionId -> {
+                               final String url = leaderAddressSessionId.f0;
+                               try {
+                                       return new URL(url);
+                               } catch (MalformedURLException e) {
+                                       throw new 
IllegalArgumentException("Could not parse URL from " + url, e);
+                               }
+                       }, executorService);
+       }
+
+       private CompletableFuture<String> getDispatcherAddress() {
+               return FutureUtils.orTimeout(
+                               dispatcherLeaderRetriever.getLeaderFuture(),
+                               
restClusterClientConfiguration.getAwaitLeaderTimeout(),
+                               TimeUnit.MILLISECONDS)
+                       .thenApplyAsync(leaderAddressSessionId -> {
+                               final String address = 
leaderAddressSessionId.f0;
+                               final Option<String> host = 
AddressFromURIString.parse(address).host();
+                               checkArgument(host.isDefined(), "Could not 
parse host from %s", address);
+                               return host.get();
+                       }, executorService);
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
index 788eba9..2d58dce 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
@@ -19,60 +19,73 @@
 package org.apache.flink.client.program.rest;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A configuration object for {@link RestClusterClient}s.
  */
 public final class RestClusterClientConfiguration {
 
-       private final String blobServerAddress;
-
        private final RestClientConfiguration restClientConfiguration;
 
-       private final String restServerAddress;
+       private final long awaitLeaderTimeout;
+
+       private final int retryMaxAttempts;
 
-       private final int restServerPort;
+       private final long retryDelay;
 
        private RestClusterClientConfiguration(
-                       String blobServerAddress,
-                       RestClientConfiguration endpointConfiguration,
-                       String restServerAddress,
-                       int restServerPort) {
-               this.blobServerAddress = 
Preconditions.checkNotNull(blobServerAddress);
+                       final RestClientConfiguration endpointConfiguration,
+                       final long awaitLeaderTimeout,
+                       final int retryMaxAttempts,
+                       final long retryDelay) {
+               checkArgument(awaitLeaderTimeout >= 0, "awaitLeaderTimeout must 
be equal to or greater than 0");
+               checkArgument(retryMaxAttempts >= 0, "retryMaxAttempts must be 
equal to or greater than 0");
+               checkArgument(retryDelay >= 0, "retryDelay must be equal to or 
greater than 0");
+
                this.restClientConfiguration = 
Preconditions.checkNotNull(endpointConfiguration);
-               this.restServerAddress = 
Preconditions.checkNotNull(restServerAddress);
-               this.restServerPort = restServerPort;
+               this.awaitLeaderTimeout = awaitLeaderTimeout;
+               this.retryMaxAttempts = retryMaxAttempts;
+               this.retryDelay = retryDelay;
        }
 
-       public String getBlobServerAddress() {
-               return blobServerAddress;
+       public RestClientConfiguration getRestClientConfiguration() {
+               return restClientConfiguration;
        }
 
-       public String getRestServerAddress() {
-               return restServerAddress;
+       /**
+        * @see RestOptions#AWAIT_LEADER_TIMEOUT
+        */
+       public long getAwaitLeaderTimeout() {
+               return awaitLeaderTimeout;
        }
 
-       public int getRestServerPort() {
-               return restServerPort;
+       /**
+        * @see RestOptions#RETRY_MAX_ATTEMPTS
+        */
+       public int getRetryMaxAttempts() {
+               return retryMaxAttempts;
        }
 
-       public RestClientConfiguration getRestClientConfiguration() {
-               return restClientConfiguration;
+       /**
+        * @see RestOptions#RETRY_DELAY
+        */
+       public long getRetryDelay() {
+               return retryDelay;
        }
 
        public static RestClusterClientConfiguration 
fromConfiguration(Configuration config) throws ConfigurationException {
-               String blobServerAddress = 
config.getString(JobManagerOptions.ADDRESS);
-
-               String serverAddress = 
config.getString(RestOptions.REST_ADDRESS);
-               int serverPort = config.getInteger(RestOptions.REST_PORT);
-
                RestClientConfiguration restClientConfiguration = 
RestClientConfiguration.fromConfiguration(config);
 
-               return new RestClusterClientConfiguration(blobServerAddress, 
restClientConfiguration, serverAddress, serverPort);
+               final long awaitLeaderTimeout = 
config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT);
+               final int retryMaxAttempts = 
config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS);
+               final long retryDelay = config.getLong(RestOptions.RETRY_DELAY);
+
+               return new 
RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, 
retryMaxAttempts, retryDelay);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java
new file mode 100644
index 0000000..30cf926
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RestClusterClientConfiguration}.
+ */
+public class RestClusterClientConfigurationTest extends TestLogger {
+
+       private RestClusterClientConfiguration restClusterClientConfiguration;
+
+       @Before
+       public void setUp() throws Exception {
+               final Configuration config = new Configuration();
+               config.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 1);
+               config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 2);
+               config.setLong(RestOptions.RETRY_DELAY, 3);
+               restClusterClientConfiguration = 
RestClusterClientConfiguration.fromConfiguration(config);
+       }
+
+       @Test
+       public void testConfiguration() {
+               assertEquals(1, 
restClusterClientConfiguration.getAwaitLeaderTimeout());
+               assertEquals(2, 
restClusterClientConfiguration.getRetryMaxAttempts());
+               assertEquals(3, restClusterClientConfiguration.getRetryDelay());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f7cda03..c880817 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program.rest;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -25,6 +26,7 @@ import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -34,6 +36,8 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -70,9 +74,12 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMes
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -90,6 +97,7 @@ import org.mockito.MockitoAnnotations;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -99,12 +107,17 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
@@ -130,6 +143,10 @@ public class RestClusterClientTest extends TestLogger {
 
        private RestClusterClient<StandaloneClusterId> restClusterClient;
 
+       private volatile FailHttpRequestPredicate failHttpRequest = 
FailHttpRequestPredicate.never();
+
+       private ExecutorService executor;
+
        @Before
        public void setUp() throws Exception {
                MockitoAnnotations.initMocks(this);
@@ -137,9 +154,30 @@ public class RestClusterClientTest extends TestLogger {
 
                final Configuration config = new Configuration();
                config.setString(JobManagerOptions.ADDRESS, "localhost");
+               config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
+               config.setLong(RestOptions.RETRY_DELAY, 0);
+
                restServerEndpointConfiguration = 
RestServerEndpointConfiguration.fromConfiguration(config);
                mockGatewayRetriever = () -> 
CompletableFuture.completedFuture(mockRestfulGateway);
-               restClusterClient = new RestClusterClient(config, 
StandaloneClusterId.getInstance(), (attempt) -> 0);
+
+               executor = Executors.newSingleThreadExecutor(new 
ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
+               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), executor) {
+                       @Override
+                       public <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
+                       sendRequest(
+                                       final String targetAddress,
+                                       final int targetPort,
+                                       final M messageHeaders,
+                                       final U messageParameters,
+                                       final R request) throws IOException {
+                               if (failHttpRequest.test(messageHeaders, 
messageParameters, request)) {
+                                       return 
FutureUtils.completedExceptionally(new IOException("expected"));
+                               } else {
+                                       return super.sendRequest(targetAddress, 
targetPort, messageHeaders, messageParameters, request);
+                               }
+                       }
+               };
+               restClusterClient = new RestClusterClient<>(config, restClient, 
StandaloneClusterId.getInstance(), (attempt) -> 0);
        }
 
        @After
@@ -147,6 +185,10 @@ public class RestClusterClientTest extends TestLogger {
                if (restClusterClient != null) {
                        restClusterClient.shutdown();
                }
+
+               if (executor != null) {
+                       executor.shutdown();
+               }
        }
 
        @Test
@@ -158,11 +200,11 @@ public class RestClusterClientTest extends TestLogger {
                TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
                TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
                TestJobExecutionResultHandler testJobExecutionResultHandler =
-                       new 
TestJobExecutionResultHandler(Collections.singletonList(
+                       new TestJobExecutionResultHandler(
                                JobExecutionResultResponseBody.created(new 
JobResult.Builder()
                                        .jobId(id)
                                        .netRuntime(Long.MAX_VALUE)
-                                       .build())).iterator());
+                                       .build()));
 
                try (TestRestServerEndpoint ignored = createRestServerEndpoint(
                        portHandler,
@@ -239,14 +281,17 @@ public class RestClusterClientTest extends TestLogger {
        private class TestJobExecutionResultHandler
                extends TestHandler<EmptyRequestBody, 
JobExecutionResultResponseBody, JobMessageParameters> {
 
-               private final Iterator<JobExecutionResultResponseBody> 
jobExecutionResults;
+               private final Iterator<Object> jobExecutionResults;
 
-               private JobExecutionResultResponseBody lastJobExecutionResult;
+               private Object lastJobExecutionResult;
 
                private TestJobExecutionResultHandler(
-                               final Iterator<JobExecutionResultResponseBody> 
jobExecutionResults) {
+                               final Object... jobExecutionResults) {
                        super(JobExecutionResultHeaders.getInstance());
-                       this.jobExecutionResults = jobExecutionResults;
+                       checkArgument(Arrays.stream(jobExecutionResults)
+                               .allMatch(object -> object instanceof 
JobExecutionResultResponseBody
+                                       || object instanceof 
RestHandlerException));
+                       this.jobExecutionResults = 
Arrays.asList(jobExecutionResults).iterator();
                }
 
                @Override
@@ -257,7 +302,13 @@ public class RestClusterClientTest extends TestLogger {
                                lastJobExecutionResult = 
jobExecutionResults.next();
                        }
                        checkState(lastJobExecutionResult != null);
-                       return 
CompletableFuture.completedFuture(lastJobExecutionResult);
+                       if (lastJobExecutionResult instanceof 
JobExecutionResultResponseBody) {
+                               return 
CompletableFuture.completedFuture((JobExecutionResultResponseBody) 
lastJobExecutionResult);
+                       } else if (lastJobExecutionResult instanceof 
RestHandlerException) {
+                               return 
FutureUtils.completedExceptionally((RestHandlerException) 
lastJobExecutionResult);
+                       } else {
+                               throw new AssertionError();
+                       }
                }
        }
 
@@ -267,7 +318,8 @@ public class RestClusterClientTest extends TestLogger {
                final JobID jobId = jobGraph.getJobID();
 
                final TestJobExecutionResultHandler 
testJobExecutionResultHandler =
-                       new TestJobExecutionResultHandler(Arrays.asList(
+                       new TestJobExecutionResultHandler(
+                               new RestHandlerException("should trigger 
retry", HttpResponseStatus.NOT_FOUND),
                                JobExecutionResultResponseBody.inProgress(),
                                JobExecutionResultResponseBody.created(new 
JobResult.Builder()
                                        .jobId(jobId)
@@ -278,17 +330,23 @@ public class RestClusterClientTest extends TestLogger {
                                        .jobId(jobId)
                                        .netRuntime(Long.MAX_VALUE)
                                        .serializedThrowable(new 
SerializedThrowable(new RuntimeException("expected")))
-                                       .build())).iterator());
+                                       .build()));
+
+               // fail first HTTP polling attempt, which should not be a 
problem because of the retries
+               final AtomicBoolean firstPollFailed = new AtomicBoolean();
+               failHttpRequest = (messageHeaders, messageParameters, 
requestBody) ->
+                       messageHeaders instanceof JobExecutionResultHeaders && 
!firstPollFailed.getAndSet(true);
 
                try (TestRestServerEndpoint ignored = createRestServerEndpoint(
                        testJobExecutionResultHandler,
                        new TestBlobServerPortHandler(),
                        new TestJobSubmitHandler())) {
 
-                       final org.apache.flink.api.common.JobExecutionResult 
jobExecutionResult =
-                               
(org.apache.flink.api.common.JobExecutionResult) restClusterClient.submitJob(
-                                       jobGraph,
-                                       ClassLoader.getSystemClassLoader());
+                       JobExecutionResult jobExecutionResult;
+
+                       jobExecutionResult = (JobExecutionResult) 
restClusterClient.submitJob(
+                               jobGraph,
+                               ClassLoader.getSystemClassLoader());
                        assertThat(jobExecutionResult.getJobID(), 
equalTo(jobId));
                        assertThat(jobExecutionResult.getNetRuntime(), 
equalTo(Long.MAX_VALUE));
                        assertThat(
@@ -314,9 +372,9 @@ public class RestClusterClientTest extends TestLogger {
                final TestSavepointHandlers testSavepointHandlers = new 
TestSavepointHandlers();
                final TestSavepointHandlers.TestSavepointTriggerHandler 
triggerHandler =
                        testSavepointHandlers.new TestSavepointTriggerHandler(
-                               Arrays.asList(null, targetSavepointDirectory, 
null).iterator());
+                               null, targetSavepointDirectory, null);
                final TestSavepointHandlers.TestSavepointHandler 
savepointHandler =
-                       testSavepointHandlers.new 
TestSavepointHandler(Arrays.asList(
+                       testSavepointHandlers.new TestSavepointHandler(
                                new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
                                        
testSavepointHandlers.testSavepointTriggerId,
                                        savepointLocationDefaultDir,
@@ -328,7 +386,14 @@ public class RestClusterClientTest extends TestLogger {
                                new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
                                        
testSavepointHandlers.testSavepointTriggerId,
                                        null,
-                                       new SerializedThrowable(new 
RuntimeException("expected"))))).iterator());
+                                       new SerializedThrowable(new 
RuntimeException("expected")))),
+                               new RestHandlerException("not found", 
HttpResponseStatus.NOT_FOUND));
+
+               // fail first HTTP polling attempt, which should not be a 
problem because of the retries
+               final AtomicBoolean firstPollFailed = new AtomicBoolean();
+               failHttpRequest = (messageHeaders, messageParameters, 
requestBody) ->
+                       messageHeaders instanceof SavepointStatusHeaders && 
!firstPollFailed.getAndSet(true);
+
                try (TestRestServerEndpoint ignored = createRestServerEndpoint(
                        triggerHandler,
                        savepointHandler)) {
@@ -358,6 +423,14 @@ public class RestClusterClientTest extends TestLogger {
                                                .getMessage(), 
equalTo("expected"));
                                }
                        }
+
+                       try {
+                               restClusterClient.triggerSavepoint(new JobID(), 
null).get();
+                       } catch (final ExecutionException e) {
+                               assertTrue(
+                                       "RestClientException not in causal 
chain",
+                                       ExceptionUtils.findThrowable(e, 
RestClientException.class).isPresent());
+                       }
                }
        }
 
@@ -369,9 +442,9 @@ public class RestClusterClientTest extends TestLogger {
 
                        private final Iterator<String> 
expectedTargetDirectories;
 
-                       TestSavepointTriggerHandler(final Iterator<String> 
expectedTargetDirectories) {
+                       TestSavepointTriggerHandler(final String... 
expectedTargetDirectories) {
                                super(SavepointTriggerHeaders.getInstance());
-                               this.expectedTargetDirectories = 
expectedTargetDirectories;
+                               this.expectedTargetDirectories = 
Arrays.asList(expectedTargetDirectories).iterator();
                        }
 
                        @Override
@@ -393,11 +466,14 @@ public class RestClusterClientTest extends TestLogger {
                private class TestSavepointHandler
                                extends TestHandler<EmptyRequestBody, 
SavepointResponseBody, SavepointStatusMessageParameters> {
 
-                       private final Iterator<SavepointResponseBody> 
expectedSavepointResponseBodies;
+                       private final Iterator<Object> 
expectedSavepointResponseBodies;
 
-                       TestSavepointHandler(final 
Iterator<SavepointResponseBody> expectedSavepointResponseBodies) {
+                       TestSavepointHandler(final Object... 
expectedSavepointResponseBodies) {
                                super(SavepointStatusHeaders.getInstance());
-                               this.expectedSavepointResponseBodies = 
expectedSavepointResponseBodies;
+                               
checkArgument(Arrays.stream(expectedSavepointResponseBodies)
+                                       .allMatch(response -> response 
instanceof SavepointResponseBody ||
+                                               response instanceof 
RestHandlerException));
+                               this.expectedSavepointResponseBodies = 
Arrays.asList(expectedSavepointResponseBodies).iterator();
                        }
 
                        @Override
@@ -406,7 +482,14 @@ public class RestClusterClientTest extends TestLogger {
                                        @Nonnull DispatcherGateway gateway) 
throws RestHandlerException {
                                final SavepointTriggerId savepointTriggerId = 
request.getPathParameter(SavepointTriggerIdPathParameter.class);
                                if 
(testSavepointTriggerId.equals(savepointTriggerId)) {
-                                       return 
CompletableFuture.completedFuture(expectedSavepointResponseBodies.next());
+                                       final Object response = 
expectedSavepointResponseBodies.next();
+                                       if (response instanceof 
SavepointResponseBody) {
+                                               return 
CompletableFuture.completedFuture((SavepointResponseBody) response);
+                                       } else if (response instanceof 
RestHandlerException) {
+                                               return 
FutureUtils.completedExceptionally((RestHandlerException) response);
+                                       } else {
+                                               throw new AssertionError();
+                                       }
                                } else {
                                        return 
FutureUtils.completedExceptionally(
                                                new RestHandlerException(
@@ -490,4 +573,15 @@ public class RestClusterClientTest extends TestLogger {
                        shutdown(Time.seconds(5));
                }
        }
+
+       @FunctionalInterface
+       private interface FailHttpRequestPredicate {
+
+               boolean test(MessageHeaders<?, ?, ?> messageHeaders, 
MessageParameters messageParameters, RequestBody requestBody);
+
+               static FailHttpRequestPredicate never() {
+                       return ((messageHeaders, messageParameters, 
requestBody) -> false);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 906e266..16fd40d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -33,12 +33,52 @@ public class RestOptions {
         */
        public static final ConfigOption<String> REST_ADDRESS =
                key("rest.address")
-                       .defaultValue("localhost");
+                       .defaultValue("localhost")
+                       .withDescription("The address that the server binds 
itself to / the client connects to.");
 
        /**
         * The port that the server listens on / the client connects to.
         */
        public static final ConfigOption<Integer> REST_PORT =
                key("rest.port")
-                       .defaultValue(9067);
+                       .defaultValue(9067)
+                       .withDescription("The port that the server listens on / 
the client connects to.");
+
+       /**
+        * The time in ms that the client waits for the leader address, e.g., 
Dispatcher or
+        * WebMonitorEndpoint.
+        */
+       public static final ConfigOption<Long> AWAIT_LEADER_TIMEOUT =
+               key("rest.await-leader-timeout")
+                       .defaultValue(30_000L)
+                       .withDescription("The time in ms that the client waits 
for the leader address, e.g., " +
+                               "Dispatcher or WebMonitorEndpoint");
+
+       /**
+        * The number of retries the client will attempt if a retryable 
operations fails.
+        * @see #RETRY_DELAY
+        */
+       public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS =
+               key("rest.retry.max-attempts")
+                       .defaultValue(20)
+                       .withDescription("The number of retries the client will 
attempt if a retryable " +
+                               "operations fails.");
+
+       /**
+        * The time in ms that the client waits between retries.
+        * @see #RETRY_MAX_ATTEMPTS
+        */
+       public static final ConfigOption<Long> RETRY_DELAY =
+               key("rest.retry.delay")
+                       .defaultValue(3_000L)
+                       .withDescription(String.format("The time in ms that the 
client waits between retries " +
+                               "(See also `%s`).", RETRY_MAX_ATTEMPTS.key()));
+
+       /**
+        * The maximum time in ms for the client to establish a TCP connection.
+        */
+       public static final ConfigOption<Long> CONNECTION_TIMEOUT =
+               key("rest.connection-timeout")
+                       .defaultValue(15_000L)
+                       .withDescription("The maximum time in ms for the client 
to establish a TCP connection.");
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 5c69564..ca55c5b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -286,7 +286,7 @@ public final class ExceptionUtils {
         * @param searchType the type of exception to search for in the chain.
         * @return Optional throwable of the requested type if available, 
otherwise empty
         */
-       public static Optional<Throwable> findThrowable(Throwable throwable, 
Class<?> searchType) {
+       public static <T extends Throwable> Optional<T> findThrowable(Throwable 
throwable, Class<T> searchType) {
                if (throwable == null || searchType == null) {
                        return Optional.empty();
                }
@@ -294,7 +294,7 @@ public final class ExceptionUtils {
                Throwable t = throwable;
                while (t != null) {
                        if (searchType.isAssignableFrom(t.getClass())) {
-                               return Optional.of(t);
+                               return Optional.of(searchType.cast(t));
                        } else {
                                t = t.getCause();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java 
b/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
new file mode 100644
index 0000000..a0bcc13
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import java.util.function.Supplier;
+
+/**
+ * Similar to {@link java.util.function.Supplier} but can throw {@link 
Exception}.
+ */
+@FunctionalInterface
+public interface CheckedSupplier<R> extends SupplierWithException<R, 
Exception> {
+
+       static <R> Supplier<R> unchecked(CheckedSupplier<R> checkedSupplier) {
+               return () -> {
+                       try {
+                               return checkedSupplier.get();
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+               };
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
index 20a46ad..07978a5 100644
--- a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
@@ -61,4 +61,12 @@ public class ExceptionUtilsTest extends TestLogger {
                // non-fatal error is not rethrown
                ExceptionUtils.rethrowIfFatalError(new NoClassDefFoundError());
        }
+
+       @Test
+       public void testFindThrowableByType() {
+               assertTrue(ExceptionUtils.findThrowable(
+                       new RuntimeException(new IllegalStateException()),
+                       IllegalStateException.class).isPresent());
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 964b63b..b5d327f 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -33,6 +35,7 @@ import 
org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ConfigurationException;
@@ -54,6 +57,7 @@ import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
@@ -285,13 +289,45 @@ public class RestAPIDocGenerator {
                }
 
                private DocumentingDispatcherRestEndpoint() {
-                       super(restConfig, dispatcherGatewayRetriever, config, 
handlerConfig, resourceManagerGatewayRetriever, executor, 
metricQueryServiceRetriever);
+                       super(restConfig, dispatcherGatewayRetriever, config, 
handlerConfig, resourceManagerGatewayRetriever, executor, 
metricQueryServiceRetriever, NoOpElectionService.INSTANCE, 
NoOpFatalErrorHandler.INSTANCE);
                }
 
                @Override
                public List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> 
restAddressFuture) {
                        return super.initializeHandlers(restAddressFuture);
                }
+
+               private enum NoOpElectionService implements 
LeaderElectionService {
+                       INSTANCE;
+                       @Override
+                       public void start(final LeaderContender contender) 
throws Exception {
+
+                       }
+
+                       @Override
+                       public void stop() throws Exception {
+
+                       }
+
+                       @Override
+                       public void confirmLeaderSessionID(final UUID 
leaderSessionID) {
+
+                       }
+
+                       @Override
+                       public boolean hasLeadership() {
+                               return false;
+                       }
+               }
+
+               private enum NoOpFatalErrorHandler implements FatalErrorHandler 
{
+                       INSTANCE;
+
+                       @Override
+                       public void onFatalError(final Throwable exception) {
+
+                       }
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index fd6b2ca..7109171 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -119,7 +119,7 @@ public class RuntimeMonitorHandler extends 
RedirectHandler<JobManagerGateway> im
                                if (throwable != null) {
                                        LOG.debug("Error while handling 
request.", throwable);
 
-                                       Optional<Throwable> optNotFound = 
ExceptionUtils.findThrowable(throwable, NotFoundException.class);
+                                       Optional<NotFoundException> optNotFound 
= ExceptionUtils.findThrowable(throwable, NotFoundException.class);
 
                                        if (optNotFound.isPresent()) {
                                                // this should result in a 404 
error code (not found)

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 7195957..17381a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import scala.concurrent.Future;
@@ -127,6 +128,7 @@ public class FutureUtils {
         * @param operation to retry
         * @param retries number of retries
         * @param retryDelay delay between retries
+        * @param retryPredicate Predicate to test whether an exception is 
retryable
         * @param scheduledExecutor executor to be used for the retry operation
         * @param <T> type of the result
         * @return Future which retries the given operation a given amount of 
times and delays the retry in case of failures
@@ -135,6 +137,7 @@ public class FutureUtils {
                        final Supplier<CompletableFuture<T>> operation,
                        final int retries,
                        final Time retryDelay,
+                       final Predicate<Throwable> retryPredicate,
                        final ScheduledExecutor scheduledExecutor) {
 
                final CompletableFuture<T> resultFuture = new 
CompletableFuture<>();
@@ -144,16 +147,41 @@ public class FutureUtils {
                        operation,
                        retries,
                        retryDelay,
+                       retryPredicate,
                        scheduledExecutor);
 
                return resultFuture;
        }
 
+       /**
+        * Retry the given operation with the given delay in between failures.
+        *
+        * @param operation to retry
+        * @param retries number of retries
+        * @param retryDelay delay between retries
+        * @param scheduledExecutor executor to be used for the retry operation
+        * @param <T> type of the result
+        * @return Future which retries the given operation a given amount of 
times and delays the retry in case of failures
+        */
+       public static <T> CompletableFuture<T> retryWithDelay(
+                       final Supplier<CompletableFuture<T>> operation,
+                       final int retries,
+                       final Time retryDelay,
+                       final ScheduledExecutor scheduledExecutor) {
+               return retryWithDelay(
+                       operation,
+                       retries,
+                       retryDelay,
+                       (throwable) -> true,
+                       scheduledExecutor);
+       }
+
        private static <T> void retryOperationWithDelay(
                        final CompletableFuture<T> resultFuture,
                        final Supplier<CompletableFuture<T>> operation,
                        final int retries,
                        final Time retryDelay,
+                       final Predicate<Throwable> retryPredicate,
                        final ScheduledExecutor scheduledExecutor) {
 
                if (!resultFuture.isDone()) {
@@ -165,17 +193,21 @@ public class FutureUtils {
                                                if (throwable instanceof 
CancellationException) {
                                                        
resultFuture.completeExceptionally(new RetryException("Operation future was 
cancelled.", throwable));
                                                } else {
-                                                       if (retries > 0) {
+                                                       if (retries > 0 && 
retryPredicate.test(throwable)) {
                                                                final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
-                                                                       () -> 
retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, 
scheduledExecutor),
+                                                                       () -> 
retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, 
retryPredicate, scheduledExecutor),
                                                                        
retryDelay.toMilliseconds(),
                                                                        
TimeUnit.MILLISECONDS);
 
                                                                
resultFuture.whenComplete(
                                                                        
(innerT, innerThrowable) -> scheduledFuture.cancel(false));
                                                        } else {
-                                                               
resultFuture.completeExceptionally(new RetryException("Could not complete the 
operation. Number of retries " +
-                                                                       "has 
been exhausted.", throwable));
+                                                               final String 
errorMsg = retries == 0 ?
+                                                                       "Number 
of retries has been exhausted." :
+                                                                       
"Exception is not retryable.";
+                                                               
resultFuture.completeExceptionally(new RetryException(
+                                                                       "Could 
not complete the operation. " + errorMsg,
+                                                                       
throwable));
                                                        }
                                                }
                                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 2ab97e2..0ac64b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
@@ -29,6 +30,7 @@ import 
org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -46,13 +48,15 @@ import java.util.concurrent.Executor;
 public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway> {
 
        public DispatcherRestEndpoint(
-                       RestServerEndpointConfiguration endpointConfiguration,
-                       GatewayRetriever<DispatcherGateway> leaderRetriever,
-                       Configuration clusterConfiguration,
-                       RestHandlerConfiguration restConfiguration,
-                       GatewayRetriever<ResourceManagerGateway> 
resourceManagerRetriever,
-                       Executor executor,
-                       MetricQueryServiceRetriever 
metricQueryServiceRetriever) {
+               RestServerEndpointConfiguration endpointConfiguration,
+               GatewayRetriever<DispatcherGateway> leaderRetriever,
+               Configuration clusterConfiguration,
+               RestHandlerConfiguration restConfiguration,
+               GatewayRetriever<ResourceManagerGateway> 
resourceManagerRetriever,
+               Executor executor,
+               MetricQueryServiceRetriever metricQueryServiceRetriever,
+               LeaderElectionService leaderElectionService,
+               FatalErrorHandler fatalErrorHandler) {
                super(
                        endpointConfiguration,
                        leaderRetriever,
@@ -60,7 +64,9 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                        restConfiguration,
                        resourceManagerRetriever,
                        executor,
-                       metricQueryServiceRetriever);
+                       metricQueryServiceRetriever,
+                       leaderElectionService,
+                       fatalErrorHandler);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index b90253a..e52f113 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -119,7 +120,8 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        jobMasterGatewayRetriever,
                        resourceManagerGatewayRetriever,
                        rpcService.getExecutor(),
-                       new AkkaQueryServiceRetriever(actorSystem, timeout));
+                       new AkkaQueryServiceRetriever(actorSystem, timeout),
+                       
highAvailabilityServices.getWebMonitorLeaderElectionService());
 
                LOG.debug("Starting JobMaster REST endpoint.");
                jobMasterRestEndpoint.start();
@@ -163,7 +165,8 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        GatewayRetriever<JobMasterGateway> 
jobMasterGatewayRetriever,
                        GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
                        Executor executor,
-                       MetricQueryServiceRetriever 
metricQueryServiceRetriever) throws ConfigurationException {
+                       MetricQueryServiceRetriever metricQueryServiceRetriever,
+                       LeaderElectionService leaderElectionService) throws 
ConfigurationException {
 
                final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
 
@@ -174,7 +177,9 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        restHandlerConfiguration,
                        resourceManagerGatewayRetriever,
                        executor,
-                       metricQueryServiceRetriever);
+                       metricQueryServiceRetriever,
+                       leaderElectionService,
+                       this);
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 0b1cea0..fe9bd92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -121,7 +122,8 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        dispatcherGatewayRetriever,
                        resourceManagerGatewayRetriever,
                        rpcService.getExecutor(),
-                       new AkkaQueryServiceRetriever(actorSystem, timeout));
+                       new AkkaQueryServiceRetriever(actorSystem, timeout),
+                       
highAvailabilityServices.getWebMonitorLeaderElectionService());
 
                LOG.debug("Starting Dispatcher REST endpoint.");
                dispatcherRestEndpoint.start();
@@ -227,11 +229,12 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
        }
 
        protected DispatcherRestEndpoint createDispatcherRestEndpoint(
-                       Configuration configuration,
-                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
-                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
-                       Executor executor,
-                       MetricQueryServiceRetriever 
metricQueryServiceRetriever) throws Exception {
+               Configuration configuration,
+               LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
+               LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+               Executor executor,
+               MetricQueryServiceRetriever metricQueryServiceRetriever,
+               LeaderElectionService leaderElectionService) throws Exception {
 
                final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
 
@@ -242,7 +245,9 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        restHandlerConfiguration,
                        resourceManagerGatewayRetriever,
                        executor,
-                       metricQueryServiceRetriever);
+                       metricQueryServiceRetriever,
+                       leaderElectionService,
+                       this);
        }
 
        protected Dispatcher createDispatcher(

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index defe5cc..e65e952 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -98,6 +98,8 @@ public interface HighAvailabilityServices extends 
AutoCloseable {
         */
        LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String 
defaultJobManagerAddress);
 
+       LeaderRetrievalService getWebMonitorLeaderRetriever();
+
        /**
         * Gets the leader election service for the cluster's resource manager.
         *
@@ -120,6 +122,8 @@ public interface HighAvailabilityServices extends 
AutoCloseable {
         */
        LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
 
+       LeaderElectionService getWebMonitorLeaderElectionService();
+
        /**
         * Gets the checkpoint recovery factory for the job manager
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 7a89ed8..4f12f2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -95,10 +97,16 @@ public class HighAvailabilityServicesUtils {
                                        addressResolution,
                                        configuration);
 
+                               final String address = 
configuration.getString(RestOptions.REST_ADDRESS);
+                               final int port = 
configuration.getInteger(RestOptions.REST_PORT);
+                               final boolean enableSSL = 
configuration.getBoolean(SecurityOptions.SSL_ENABLED);
+                               final String protocol = enableSSL ? "https://"; 
: "http://";;
+
                                return new StandaloneHaServices(
                                        resourceManagerRpcUrl,
                                        dispatcherRpcUrl,
-                                       jobManagerRpcUrl);
+                                       jobManagerRpcUrl,
+                                       String.format("%s%s:%s", protocol, 
address, port));
                        case ZOOKEEPER:
                                BlobStoreService blobStoreService = 
BlobUtils.createBlobStoreFromConfig(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
index 4c30f87..7b2c69b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.concurrent.GuardedBy;
+
 import java.util.HashMap;
 import java.util.concurrent.Executor;
 
@@ -49,11 +50,14 @@ public class EmbeddedHaServices extends 
AbstractNonHaServices {
 
        private final HashMap<JobID, EmbeddedLeaderService> 
jobManagerLeaderServices;
 
+       private final EmbeddedLeaderService webMonitorLeaderService;
+
        public EmbeddedHaServices(Executor executor) {
                this.executor = Preconditions.checkNotNull(executor);
                this.resourceManagerLeaderService = new 
EmbeddedLeaderService(executor);
                this.dispatcherLeaderService = new 
EmbeddedLeaderService(executor);
                this.jobManagerLeaderServices = new HashMap<>();
+               this.webMonitorLeaderService = new 
EmbeddedLeaderService(executor);
        }
 
        // 
------------------------------------------------------------------------
@@ -97,6 +101,11 @@ public class EmbeddedHaServices extends 
AbstractNonHaServices {
        }
 
        @Override
+       public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+               return webMonitorLeaderService.createLeaderRetrievalService();
+       }
+
+       @Override
        public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
                checkNotNull(jobID);
 
@@ -107,6 +116,11 @@ public class EmbeddedHaServices extends 
AbstractNonHaServices {
                }
        }
 
+       @Override
+       public LeaderElectionService getWebMonitorLeaderElectionService() {
+               return webMonitorLeaderService.createLeaderElectionService();
+       }
+
        // 
------------------------------------------------------------------------
        // internal
        // 
------------------------------------------------------------------------
@@ -136,6 +150,8 @@ public class EmbeddedHaServices extends 
AbstractNonHaServices {
                                jobManagerLeaderServices.clear();
 
                                resourceManagerLeaderService.shutdown();
+
+                               webMonitorLeaderService.shutdown();
                        }
 
                        super.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
index 617b351..cbfcd49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
@@ -51,18 +51,23 @@ public class StandaloneHaServices extends 
AbstractNonHaServices {
        /** The fix address of the JobManager */
        private final String jobManagerAddress;
 
+       private final String webMonitorAddress;
+
        /**
         * Creates a new services class for the fix pre-defined leaders.
-        * 
+        *
         * @param resourceManagerAddress    The fix address of the 
ResourceManager
+        * @param webMonitorAddress
         */
        public StandaloneHaServices(
-               String resourceManagerAddress,
-               String dispatcherAddress,
-               String jobManagerAddress) {
+                       String resourceManagerAddress,
+                       String dispatcherAddress,
+                       String jobManagerAddress,
+                       String webMonitorAddress) {
                this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress, "resourceManagerAddress");
                this.dispatcherAddress = checkNotNull(dispatcherAddress, 
"dispatcherAddress");
                this.jobManagerAddress = checkNotNull(jobManagerAddress, 
"jobManagerAddress");
+               this.webMonitorAddress = checkNotNull(webMonitorAddress, 
webMonitorAddress);
        }
 
        // 
------------------------------------------------------------------------
@@ -132,4 +137,23 @@ public class StandaloneHaServices extends 
AbstractNonHaServices {
                        return new StandaloneLeaderElectionService();
                }
        }
+
+       @Override
+       public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+               synchronized (lock) {
+                       checkNotShutdown();
+
+                       return new 
StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);
+               }
+       }
+
+       @Override
+       public LeaderElectionService getWebMonitorLeaderElectionService() {
+               synchronized (lock) {
+                       checkNotShutdown();
+
+                       return new StandaloneLeaderElectionService();
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 04ab6d3..6d5c721 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.highavailability.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -35,6 +33,8 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.apache.curator.framework.CuratorFramework;
+
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
@@ -86,6 +86,8 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
 
        private static final String JOB_MANAGER_LEADER_PATH = 
"/job_manager_lock";
 
+       private static final String REST_SERVER_LEADER_PATH = 
"/rest_server_lock";
+
        // 
------------------------------------------------------------------------
        
        
@@ -142,6 +144,11 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
+       public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, REST_SERVER_LEADER_PATH);
+       }
+
+       @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() {
                return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, RESOURCE_MANAGER_LEADER_PATH);
        }
@@ -157,6 +164,11 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
+       public LeaderElectionService getWebMonitorLeaderElectionService() {
+               return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, REST_SERVER_LEADER_PATH);
+       }
+
+       @Override
        public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
                return new ZooKeeperCheckpointRecoveryFactory(client, 
configuration, executor);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
index 4baac95..1ed191f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -34,13 +36,15 @@ import java.util.concurrent.Executor;
 public class JobMasterRestEndpoint extends 
WebMonitorEndpoint<JobMasterGateway> {
 
        public JobMasterRestEndpoint(
-                       RestServerEndpointConfiguration endpointConfiguration,
-                       GatewayRetriever<JobMasterGateway> leaderRetriever,
-                       Configuration clusterConfiguration,
-                       RestHandlerConfiguration restConfiguration,
-                       GatewayRetriever<ResourceManagerGateway> 
resourceManagerRetriever,
-                       Executor executor,
-                       MetricQueryServiceRetriever 
metricQueryServiceRetriever) {
-               super(endpointConfiguration, leaderRetriever, 
clusterConfiguration, restConfiguration, resourceManagerRetriever, executor, 
metricQueryServiceRetriever);
+               RestServerEndpointConfiguration endpointConfiguration,
+               GatewayRetriever<JobMasterGateway> leaderRetriever,
+               Configuration clusterConfiguration,
+               RestHandlerConfiguration restConfiguration,
+               GatewayRetriever<ResourceManagerGateway> 
resourceManagerRetriever,
+               Executor executor,
+               MetricQueryServiceRetriever metricQueryServiceRetriever,
+               LeaderElectionService leaderElectionService,
+               FatalErrorHandler fatalErrorHandler) {
+               super(endpointConfiguration, leaderRetriever, 
clusterConfiguration, restConfiguration, resourceManagerRetriever, executor, 
metricQueryServiceRetriever, leaderElectionService, fatalErrorHandler);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 2db1fce..59d3592 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -413,4 +413,11 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
        public void unhandledError(String message, Throwable e) {
                leaderContender.handleError(new FlinkException("Unhandled error 
in ZooKeeperLeaderElectionService: " + message, e));
        }
+
+       @Override
+       public String toString() {
+               return "ZooKeeperLeaderElectionService{" +
+                       "leaderPath='" + leaderPath + '\'' +
+                       '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
index b5ba4e9..a9c7e5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderretrieval;
 
+import javax.annotation.Nullable;
+
 import java.util.UUID;
 
 /**
@@ -32,7 +34,7 @@ public interface LeaderRetrievalListener {
         * @param leaderAddress The address of the new leader
         * @param leaderSessionID The new leader session ID
         */
-       void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
+       void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID 
leaderSessionID);
 
        /**
         * This method is called by the {@link LeaderRetrievalService} in case 
of an exception. This

Reply via email to