Repository: flink
Updated Branches:
  refs/heads/master f9a583b72 -> 6fba46e6a


[FLINK-8776][flip6] Use correct port for job submission from Web UI.

Use address of local WebMonitorEndpoint for the job submission from the Web UI.
Rename TestingLeaderRetrievalService to SettableLeaderRetrievalService and move
class out of test directory.

This closes #5577.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbb63531
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbb63531
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbb63531

Branch: refs/heads/master
Commit: bbb63531b2118dfde1584a7f61ee908d1188698d
Parents: f9a583b
Author: gyao <[email protected]>
Authored: Sun Feb 25 12:53:53 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Mon Feb 26 10:51:30 2018 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 28 +++++++-
 .../client/program/ClientConnectionTest.java    |  6 +-
 .../program/rest/RestClusterClientTest.java     |  7 +-
 .../MesosFlinkResourceManagerTest.java          |  4 +-
 .../MesosResourceManagerTest.java               |  6 +-
 .../webmonitor/WebSubmissionExtension.java      | 12 +++-
 .../SettableLeaderRetrievalService.java         | 73 ++++++++++++++++++++
 .../runtime/client/JobClientActorTest.java      | 30 ++++----
 .../clusterframework/ResourceManagerTest.java   |  8 +--
 .../runtime/dispatcher/DispatcherTest.java      |  4 +-
 .../highavailability/ManualLeaderService.java   | 14 ++--
 .../jobmanager/JobManagerHARecoveryTest.java    |  4 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java |  4 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |  6 +-
 .../TestingLeaderRetrievalService.java          | 69 ------------------
 .../SettableLeaderRetrievalServiceTest.java     | 67 ++++++++++++++++++
 .../resourcemanager/JobLeaderIdServiceTest.java | 10 +--
 .../ResourceManagerJobMasterTest.java           | 12 ++--
 .../taskexecutor/TaskExecutorITCase.java        |  6 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 10 +--
 .../TaskManagerRegistrationTest.java            | 13 ++--
 .../runtime/taskmanager/TaskManagerTest.java    |  4 +-
 .../retriever/LeaderGatewayRetrieverTest.java   |  8 +--
 .../impl/AkkaJobManagerRetrieverTest.java       | 10 +--
 .../retriever/impl/RpcGatewayRetrieverTest.java | 10 +--
 .../java/org/apache/flink/yarn/UtilsTest.java   |  4 +-
 26 files changed, 268 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8ad571f..3a377f3 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -142,11 +142,29 @@ public class RestClusterClient<T> extends 
ClusterClient<T> {
                        config,
                        null,
                        clusterId,
-                       new ExponentialWaitStrategy(10L, 2000L));
+                       new ExponentialWaitStrategy(10L, 2000L),
+                       null);
+       }
+
+       public RestClusterClient(
+                       Configuration config,
+                       T clusterId,
+                       LeaderRetrievalService webMonitorRetrievalService) 
throws Exception {
+               this(
+                       config,
+                       null,
+                       clusterId,
+                       new ExponentialWaitStrategy(10L, 2000L),
+                       webMonitorRetrievalService);
        }
 
        @VisibleForTesting
-       RestClusterClient(Configuration configuration, @Nullable RestClient 
restClient, T clusterId, WaitStrategy waitStrategy) throws Exception {
+       RestClusterClient(
+                       Configuration configuration,
+                       @Nullable RestClient restClient,
+                       T clusterId,
+                       WaitStrategy waitStrategy,
+                       @Nullable LeaderRetrievalService 
webMonitorRetrievalService) throws Exception {
                super(configuration);
                this.restClusterClientConfiguration = 
RestClusterClientConfiguration.fromConfiguration(configuration);
 
@@ -159,7 +177,11 @@ public class RestClusterClient<T> extends ClusterClient<T> 
{
                this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
                this.clusterId = Preconditions.checkNotNull(clusterId);
 
-               this.webMonitorRetrievalService = 
highAvailabilityServices.getWebMonitorLeaderRetriever();
+               if (webMonitorRetrievalService == null) {
+                       this.webMonitorRetrievalService = 
highAvailabilityServices.getWebMonitorLeaderRetriever();
+               } else {
+                       this.webMonitorRetrievalService = 
webMonitorRetrievalService;
+               }
                this.dispatcherRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
                this.retryExecutorService = 
Executors.newSingleThreadScheduledExecutor(new 
ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
                startLeaderRetrievers();

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 72767c7..1dd4787 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.runtime.client.JobClientActorTest;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -136,9 +136,9 @@ public class ClientConnectionTest extends TestLogger {
 
                        final String expectedAddress = 
AkkaUtils.getAkkaURL(actorSystem, actorRef);
 
-                       final TestingLeaderRetrievalService 
testingLeaderRetrievalService = new 
TestingLeaderRetrievalService(expectedAddress, leaderId);
+                       final SettableLeaderRetrievalService 
settableLeaderRetrievalService = new 
SettableLeaderRetrievalService(expectedAddress, leaderId);
 
-                       
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID,
 testingLeaderRetrievalService);
+                       
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID,
 settableLeaderRetrievalService);
 
                        StandaloneClusterClient client = new 
StandaloneClusterClient(configuration, highAvailabilityServices);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f587e33..ca2ba22 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -182,7 +182,12 @@ public class RestClusterClientTest extends TestLogger {
                                }
                        }
                };
-               restClusterClient = new RestClusterClient<>(config, restClient, 
StandaloneClusterId.getInstance(), (attempt) -> 0);
+               restClusterClient = new RestClusterClient<>(
+                       config,
+                       restClient,
+                       StandaloneClusterId.getInstance(),
+                       (attempt) -> 0,
+                       null);
 
                jobGraph = new JobGraph("testjob");
                jobId = jobGraph.getJobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 56735ef..330a2c6 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -50,8 +50,8 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -213,7 +213,7 @@ public class MesosFlinkResourceManagerTest extends 
TestLogger {
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new TestingLeaderRetrievalService(
+                                       new SettableLeaderRetrievalService(
                                                jobManager.path(),
                                                
HighAvailabilityServices.DEFAULT_LEADER_ID));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 2b38b85..412e18d 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -53,7 +53,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -412,7 +412,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        public final String address;
                        public final JobMasterGateway gateway;
                        public final JobMasterId jobMasterId;
-                       public final TestingLeaderRetrievalService 
leaderRetrievalService;
+                       public final SettableLeaderRetrievalService 
leaderRetrievalService;
 
                        MockJobMaster(JobID jobID) {
                                this.jobID = jobID;
@@ -420,7 +420,7 @@ public class MesosResourceManagerTest extends TestLogger {
                                this.address = "/" + jobID;
                                this.gateway = mock(JobMasterGateway.class);
                                this.jobMasterId = JobMasterId.generate();
-                               this.leaderRetrievalService = new 
TestingLeaderRetrievalService(this.address, this.jobMasterId.toUUID());
+                               this.leaderRetrievalService = new 
SettableLeaderRetrievalService(this.address, this.jobMasterId.toUUID());
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
index e0ac250..df36483 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
@@ -62,7 +64,15 @@ public class WebSubmissionExtension implements 
WebMonitorExtension {
                        Executor executor,
                        Time timeout) throws Exception {
 
-               restClusterClient = new RestClusterClient<>(configuration, 
"WebSubmissionHandlers");
+               final SettableLeaderRetrievalService 
settableLeaderRetrievalService = new SettableLeaderRetrievalService();
+               restAddressFuture.thenAccept(restAddress -> 
settableLeaderRetrievalService.notifyListener(
+                       restAddress,
+                       HighAvailabilityServices.DEFAULT_LEADER_ID));
+
+               restClusterClient = new RestClusterClient<>(
+                       configuration,
+                       "WebSubmissionHandlers",
+                       settableLeaderRetrievalService);
 
                webSubmissionHandlers = new ArrayList<>(3);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalService.java
new file mode 100644
index 0000000..1f2711a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@link LeaderRetrievalService} implementation which directly forwards calls 
of
+ * notifyListener to the listener.
+ */
+public class SettableLeaderRetrievalService implements LeaderRetrievalService {
+
+       private String leaderAddress;
+       private UUID leaderSessionID;
+
+       private LeaderRetrievalListener listener;
+
+       public SettableLeaderRetrievalService() {
+               this(null, null);
+       }
+
+       public SettableLeaderRetrievalService(
+                       @Nullable String leaderAddress,
+                       @Nullable UUID leaderSessionID) {
+               this.leaderAddress = leaderAddress;
+               this.leaderSessionID = leaderSessionID;
+       }
+
+       @Override
+       public synchronized void start(LeaderRetrievalListener listener) throws 
Exception {
+               this.listener = Preconditions.checkNotNull(listener);
+
+               if (leaderSessionID != null && leaderAddress != null) {
+                       listener.notifyLeaderAddress(leaderAddress, 
leaderSessionID);
+               }
+       }
+
+       @Override
+       public void stop() throws Exception {
+
+       }
+
+       public synchronized void notifyListener(
+                       @Nullable String address,
+                       @Nullable UUID leaderSessionID) {
+               this.leaderAddress = address;
+               this.leaderSessionID = leaderSessionID;
+
+               if (listener != null) {
+                       listener.notifyLeaderAddress(address, leaderSessionID);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 919a784..9050d12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait;
@@ -85,13 +85,13 @@ public class JobClientActorTest extends TestLogger {
                                PlainActor.class,
                                leaderSessionID));
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService(
                        jobManager.path().toString(),
                        leaderSessionID
                );
 
                Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
-                       testingLeaderRetrievalService,
+                       settableLeaderRetrievalService,
                        jobClientActorTimeout,
                        false,
                        clientConfig);
@@ -124,13 +124,13 @@ public class JobClientActorTest extends TestLogger {
                                PlainActor.class,
                                leaderSessionID));
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService(
                        jobManager.path().toString(),
                        leaderSessionID
                );
 
                Props jobClientActorProps = 
JobAttachmentClientActor.createActorProps(
-                       testingLeaderRetrievalService,
+                       settableLeaderRetrievalService,
                        jobClientActorTimeout,
                        false);
 
@@ -154,12 +154,12 @@ public class JobClientActorTest extends TestLogger {
                FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
                FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService(
                        "localhost",
                        HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
-                       testingLeaderRetrievalService,
+                       settableLeaderRetrievalService,
                        jobClientActorTimeout,
                        false,
                        clientConfig);
@@ -183,12 +183,12 @@ public class JobClientActorTest extends TestLogger {
                FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
                FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService(
                        "localhost",
                        HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                Props jobClientActorProps = 
JobAttachmentClientActor.createActorProps(
-                       testingLeaderRetrievalService,
+                       settableLeaderRetrievalService,
                        jobClientActorTimeout,
                        false);
 
@@ -219,13 +219,13 @@ public class JobClientActorTest extends TestLogger {
                                JobAcceptingActor.class,
                                leaderSessionID));
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService(
                        jobManager.path().toString(),
                        leaderSessionID
                );
 
                Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
-                       testingLeaderRetrievalService,
+                       settableLeaderRetrievalService,
                        jobClientActorTimeout,
                        false,
                        clientConfig);
@@ -261,13 +261,13 @@ public class JobClientActorTest extends TestLogger {
                                JobAcceptingActor.class,
                                leaderSessionID));
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService(
                        jobManager.path().toString(),
                        leaderSessionID
                );
 
                Props jobClientActorProps = 
JobAttachmentClientActor.createActorProps(
-                       testingLeaderRetrievalService,
+                       settableLeaderRetrievalService,
                        jobClientActorTimeout,
                        false);
 
@@ -302,13 +302,13 @@ public class JobClientActorTest extends TestLogger {
                                JobAcceptingActor.class,
                                leaderSessionID));
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService(
                        jobManager.path().toString(),
                        leaderSessionID
                );
 
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID,
 testingLeaderRetrievalService);
+               
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID,
 settableLeaderRetrievalService);
 
                JobListeningContext jobListeningContext =
                        JobClient.submitJob(

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 241da8f..a1b3fb6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -107,7 +107,7 @@ public class ResourceManagerTest extends TestLogger {
        private final Time timeout = Time.seconds(10L);
 
        private TestingHighAvailabilityServices highAvailabilityServices;
-       private TestingLeaderRetrievalService jobManagerLeaderRetrievalService;
+       private SettableLeaderRetrievalService jobManagerLeaderRetrievalService;
 
        @BeforeClass
        public static void setup() {
@@ -121,7 +121,7 @@ public class ResourceManagerTest extends TestLogger {
 
        @Before
        public void setupTest() {
-               jobManagerLeaderRetrievalService = new 
TestingLeaderRetrievalService();
+               jobManagerLeaderRetrievalService = new 
SettableLeaderRetrievalService();
 
                highAvailabilityServices = new 
TestingHighAvailabilityServices();
 
@@ -602,7 +602,7 @@ public class ResourceManagerTest extends TestLogger {
                        Time.seconds(5L));
 
                final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
-               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
+               final SettableLeaderRetrievalService jmLeaderRetrievalService = 
new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
                final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
jmLeaderRetrievalService);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 5d264e2..8267921 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -161,7 +161,7 @@ public class DispatcherTest extends TestLogger {
                haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
                haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, 
jobMasterLeaderElectionService);
                haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
-               haServices.setResourceManagerLeaderRetriever(new 
TestingLeaderRetrievalService());
+               haServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
                runningJobsRegistry = haServices.getRunningJobsRegistry();
 
                final Configuration blobServerConfig = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java
index 423c0ce..a055edf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.Preconditions;
 
@@ -32,13 +32,13 @@ import java.util.UUID;
 /**
  * Leader service for {@link TestingManualHighAvailabilityServices} 
implementation. The leader
  * service allows to create multiple {@link TestingLeaderElectionService} and
- * {@link TestingLeaderRetrievalService} and allows to manually trigger the 
services identified
+ * {@link SettableLeaderRetrievalService} and allows to manually trigger the 
services identified
  * by a continuous index.
  */
 public class ManualLeaderService {
 
        private final List<TestingLeaderElectionService> leaderElectionServices;
-       private final List<TestingLeaderRetrievalService> 
leaderRetrievalServices;
+       private final List<SettableLeaderRetrievalService> 
leaderRetrievalServices;
 
        private int currentLeaderIndex;
 
@@ -54,13 +54,13 @@ public class ManualLeaderService {
        }
 
        public LeaderRetrievalService createLeaderRetrievalService() {
-               final TestingLeaderRetrievalService 
testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+               final SettableLeaderRetrievalService 
settableLeaderRetrievalService = new SettableLeaderRetrievalService(
                        getLeaderAddress(currentLeaderIndex),
                        currentLeaderId);
 
-               leaderRetrievalServices.add(testingLeaderRetrievalService);
+               leaderRetrievalServices.add(settableLeaderRetrievalService);
 
-               return testingLeaderRetrievalService;
+               return settableLeaderRetrievalService;
        }
 
        public LeaderElectionService createLeaderElectionService() {
@@ -100,7 +100,7 @@ public class ManualLeaderService {
        }
 
        public void notifyRetrievers(int index, UUID leaderId) {
-               for (TestingLeaderRetrievalService retrievalService: 
leaderRetrievalServices) {
+               for (SettableLeaderRetrievalService retrievalService: 
leaderRetrievalServices) {
                        
retrievalService.notifyListener(getLeaderAddress(index), leaderId);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 005dd98..309ac12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -64,7 +64,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -184,7 +184,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                        CheckpointIDCounter checkpointCounter = new 
StandaloneCheckpointIDCounter();
                        CheckpointRecoveryFactory checkpointStateFactory = new 
TestingCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
                        TestingLeaderElectionService myLeaderElectionService = 
new TestingLeaderElectionService();
-                       TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService(
+                       SettableLeaderRetrievalService myLeaderRetrievalService 
= new SettableLeaderRetrievalService(
                                null,
                                null);
                        TestingHighAvailabilityServices 
testingHighAvailabilityServices = new TestingHighAvailabilityServices();

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index 0c238ce..9730dde 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
@@ -113,7 +113,7 @@ public class JobManagerRunnerTest extends TestLogger {
        public void setup() {
                haServices = new TestingHighAvailabilityServices();
                
haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new 
TestingLeaderElectionService());
-               haServices.setResourceManagerLeaderRetriever(new 
TestingLeaderRetrievalService());
+               haServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
                haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e401020..b543056 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -45,7 +45,7 @@ import 
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -113,7 +113,7 @@ public class JobMasterTest extends TestLogger {
 
        private TestingHighAvailabilityServices haServices;
 
-       private TestingLeaderRetrievalService rmLeaderRetrievalService;
+       private SettableLeaderRetrievalService rmLeaderRetrievalService;
 
        private TestingFatalErrorHandler testingFatalErrorHandler;
 
@@ -135,7 +135,7 @@ public class JobMasterTest extends TestLogger {
 
                haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
 
-               rmLeaderRetrievalService = new TestingLeaderRetrievalService(
+               rmLeaderRetrievalService = new SettableLeaderRetrievalService(
                        null,
                        null);
                
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
deleted file mode 100644
index 15d3bde..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.util.Preconditions;
-
-import java.util.UUID;
-
-/**
- * Test {@link LeaderRetrievalService} implementation which directly forwards 
calls of
- * notifyListener to the listener.
- */
-public class TestingLeaderRetrievalService implements LeaderRetrievalService {
-
-       private volatile String leaderAddress;
-       private volatile UUID leaderSessionID;
-
-       private volatile LeaderRetrievalListener listener;
-
-       public TestingLeaderRetrievalService() {
-               this(null, null);
-       }
-
-       public TestingLeaderRetrievalService(String leaderAddress, UUID 
leaderSessionID) {
-               this.leaderAddress = leaderAddress;
-               this.leaderSessionID = leaderSessionID;
-       }
-
-       @Override
-       public void start(LeaderRetrievalListener listener) throws Exception {
-               this.listener = Preconditions.checkNotNull(listener);
-
-               if (leaderSessionID != null && leaderAddress != null) {
-                       listener.notifyLeaderAddress(leaderAddress, 
leaderSessionID);
-               }
-       }
-
-       @Override
-       public void stop() throws Exception {
-
-       }
-
-       public void notifyListener(String address, UUID leaderSessionID) {
-               this.leaderAddress = address;
-               this.leaderSessionID = leaderSessionID;
-
-               if (listener != null) {
-                       listener.notifyLeaderAddress(address, leaderSessionID);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java
new file mode 100644
index 0000000..667008b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SettableLeaderRetrievalService}.
+ */
+public class SettableLeaderRetrievalServiceTest extends TestLogger {
+
+       private SettableLeaderRetrievalService settableLeaderRetrievalService;
+
+       @Before
+       public void setUp() {
+               settableLeaderRetrievalService = new 
SettableLeaderRetrievalService();
+       }
+
+       @Test
+       public void testNotifyListenerLater() throws Exception {
+               final String localhost = "localhost";
+               settableLeaderRetrievalService.notifyListener(localhost, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+               final TestingListener listener = new TestingListener();
+               settableLeaderRetrievalService.start(listener);
+
+               assertThat(listener.getAddress(), equalTo(localhost));
+               assertThat(listener.getLeaderSessionID(), 
equalTo(HighAvailabilityServices.DEFAULT_LEADER_ID));
+       }
+
+       @Test
+       public void testNotifyListenerImmediately() throws Exception {
+               final TestingListener listener = new TestingListener();
+               settableLeaderRetrievalService.start(listener);
+
+               final String localhost = "localhost";
+               settableLeaderRetrievalService.notifyListener(localhost, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+               assertThat(listener.getAddress(), equalTo(localhost));
+               assertThat(listener.getLeaderSessionID(), 
equalTo(HighAvailabilityServices.DEFAULT_LEADER_ID));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index bb99a0c..301ab46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -68,7 +68,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
                final String address = "foobar";
                final JobMasterId leaderId = JobMasterId.generate();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService leaderRetrievalService = new 
SettableLeaderRetrievalService(
                        null,
                        null);
 
@@ -104,7 +104,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
        public void testRemovingJob() throws Exception {
                final JobID jobId = new JobID();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(null, null);
+               SettableLeaderRetrievalService leaderRetrievalService = new 
SettableLeaderRetrievalService(null, null);
 
                highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
 
@@ -145,7 +145,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
        public void testInitialJobTimeout() throws Exception {
                final JobID jobId = new JobID();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService leaderRetrievalService = new 
SettableLeaderRetrievalService(
                        null,
                        null);
 
@@ -189,7 +189,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
                final String address = "foobar";
                final JobMasterId leaderId = JobMasterId.generate();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService leaderRetrievalService = new 
SettableLeaderRetrievalService(
                        null,
                        null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index acd8774..9854f5a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -86,7 +86,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
                JobID jobID = mockJobMaster(jobMasterAddress);
                JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
+               SettableLeaderRetrievalService jobMasterLeaderRetrievalService 
= new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager<?> resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
@@ -119,7 +119,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                JobID jobID = mockJobMaster(jobMasterAddress);
                JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
+               SettableLeaderRetrievalService jobMasterLeaderRetrievalService 
= new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager<?> resourceManager = 
createAndStartResourceManager(mock(LeaderElectionService.class), jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final ResourceManagerGateway wronglyFencedGateway = 
rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), 
ResourceManagerGateway.class)
@@ -153,7 +153,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                String jobMasterAddress = "/jobMasterAddress1";
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService jobMasterLeaderRetrievalService 
= new SettableLeaderRetrievalService(
                        "localhost",
                        HighAvailabilityServices.DEFAULT_LEADER_ID);
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
@@ -187,7 +187,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                String jobMasterAddress = "/jobMasterAddress1";
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService jobMasterLeaderRetrievalService 
= new SettableLeaderRetrievalService(
                        "localhost",
                        HighAvailabilityServices.DEFAULT_LEADER_ID);
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
@@ -221,7 +221,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                String jobMasterAddress = "/jobMasterAddress1";
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+               SettableLeaderRetrievalService jobMasterLeaderRetrievalService 
= new SettableLeaderRetrievalService(
                        "localhost",
                        HighAvailabilityServices.DEFAULT_LEADER_ID);
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index d693f47..fc8337f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
@@ -100,7 +100,7 @@ public class TaskExecutorITCase extends TestLogger {
                final ResourceID taskManagerResourceId = new 
ResourceID("foobar");
                final UUID rmLeaderId = UUID.randomUUID();
                final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(null, null);
+               final SettableLeaderRetrievalService rmLeaderRetrievalService = 
new SettableLeaderRetrievalService(null, null);
                final String rmAddress = "rm";
                final String jmAddress = "jm";
                final JobMasterId jobMasterId = JobMasterId.generate();
@@ -111,7 +111,7 @@ public class TaskExecutorITCase extends TestLogger {
 
                
testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                
testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-               testingHAServices.setJobMasterLeaderRetriever(jobId, new 
TestingLeaderRetrievalService(jmAddress, jobMasterId.toUUID()));
+               testingHAServices.setJobMasterLeaderRetriever(jobId, new 
SettableLeaderRetrievalService(jmAddress, jobMasterId.toUUID()));
 
                TestingRpcService rpcService = new TestingRpcService();
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e972feb..7aae287 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -59,7 +59,7 @@ import 
org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -165,9 +165,9 @@ public class TaskExecutorTest extends TestLogger {
 
        private TestingHighAvailabilityServices haServices;
 
-       private TestingLeaderRetrievalService resourceManagerLeaderRetriever;
+       private SettableLeaderRetrievalService resourceManagerLeaderRetriever;
 
-       private TestingLeaderRetrievalService jobManagerLeaderRetriever;
+       private SettableLeaderRetrievalService jobManagerLeaderRetriever;
 
        @Before
        public void setup() throws IOException {
@@ -188,8 +188,8 @@ public class TaskExecutorTest extends TestLogger {
                testingFatalErrorHandler = new TestingFatalErrorHandler();
 
                haServices = new TestingHighAvailabilityServices();
-               resourceManagerLeaderRetriever = new 
TestingLeaderRetrievalService();
-               jobManagerLeaderRetriever = new TestingLeaderRetrievalService();
+               resourceManagerLeaderRetriever = new 
SettableLeaderRetrievalService();
+               jobManagerLeaderRetriever = new 
SettableLeaderRetrievalService();
                
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
                haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetriever);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index ceb92c4..6b65095 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -24,7 +24,6 @@ import akka.actor.InvalidActorNameException;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -38,7 +37,7 @@ import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
@@ -273,7 +272,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
                                        // Give a non-existent job manager 
address to the task manager
-                                       new TestingLeaderRetrievalService(
+                                       new SettableLeaderRetrievalService(
                                                "foobar",
                                                
HighAvailabilityServices.DEFAULT_LEADER_ID));
 
@@ -330,7 +329,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new TestingLeaderRetrievalService(
+                                       new SettableLeaderRetrievalService(
                                                jm.path(),
                                                
HighAvailabilityServices.DEFAULT_LEADER_ID));
 
@@ -397,7 +396,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new TestingLeaderRetrievalService(
+                                       new SettableLeaderRetrievalService(
                                                jm.path(),
                                                
HighAvailabilityServices.DEFAULT_LEADER_ID));
 
@@ -496,13 +495,13 @@ public class TaskManagerRegistrationTest extends 
TestLogger {
                                        Option.apply(JOB_MANAGER_NAME));
                                final ActorGateway fakeJM1Gateway = 
fakeJobManager1Gateway;
 
-                               TestingLeaderRetrievalService 
testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+                               SettableLeaderRetrievalService 
settableLeaderRetrievalService = new SettableLeaderRetrievalService(
                                        fakeJM1Gateway.path(),
                                        
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       testingLeaderRetrievalService);
+                                       settableLeaderRetrievalService);
 
                                // we make the test actor (the test kit) the 
JobManager to intercept
                                // the messages

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 9d41bfb..93ec3ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -53,7 +53,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.Tasks;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.RegistrationMessages;
@@ -1529,7 +1529,7 @@ public class TaskManagerTest extends TestLogger {
        public void testTerminationOnFatalError() {
                highAvailabilityServices.setJobMasterLeaderRetriever(
                        HighAvailabilityServices.DEFAULT_JOB_ID,
-                       new TestingLeaderRetrievalService());
+                       new SettableLeaderRetrievalService());
 
                new JavaTestKit(system){{
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
index 7be06f3..9fffc48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.retriever;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -52,14 +52,14 @@ public class LeaderGatewayRetrieverTest extends TestLogger {
                RpcGateway rpcGateway = mock(RpcGateway.class);
 
                TestingLeaderGatewayRetriever leaderGatewayRetriever = new 
TestingLeaderGatewayRetriever(rpcGateway);
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService();
 
-               testingLeaderRetrievalService.start(leaderGatewayRetriever);
+               settableLeaderRetrievalService.start(leaderGatewayRetriever);
 
                CompletableFuture<RpcGateway> gatewayFuture = 
leaderGatewayRetriever.getFuture();
 
                // this triggers the first gateway retrieval attempt
-               testingLeaderRetrievalService.notifyListener(address, leaderId);
+               settableLeaderRetrievalService.notifyListener(address, 
leaderId);
 
                // check that the first future has been failed
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
index 5d01087..94473b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClientActorTest;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -70,7 +70,7 @@ public class AkkaJobManagerRetrieverTest extends TestLogger {
        @Test
        public void testAkkaJobManagerRetrieval() throws Exception {
                AkkaJobManagerRetriever akkaJobManagerRetriever = new 
AkkaJobManagerRetriever(actorSystem, timeout, 0, Time.milliseconds(0L));
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService();
 
                CompletableFuture<JobManagerGateway> gatewayFuture = 
akkaJobManagerRetriever.getFuture();
                final UUID leaderSessionId = UUID.randomUUID();
@@ -83,18 +83,18 @@ public class AkkaJobManagerRetrieverTest extends TestLogger 
{
 
                        final String address = actorRef.path().toString();
 
-                       
testingLeaderRetrievalService.start(akkaJobManagerRetriever);
+                       
settableLeaderRetrievalService.start(akkaJobManagerRetriever);
 
                        // check that the gateway future has not been completed 
since there is no leader yet
                        assertFalse(gatewayFuture.isDone());
 
-                       testingLeaderRetrievalService.notifyListener(address, 
leaderSessionId);
+                       settableLeaderRetrievalService.notifyListener(address, 
leaderSessionId);
 
                        JobManagerGateway jobManagerGateway = 
gatewayFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
                        assertEquals(address, jobManagerGateway.getAddress());
                } finally {
-                       testingLeaderRetrievalService.stop();
+                       settableLeaderRetrievalService.stop();
 
                        if (actorRef != null) {
                                TestingUtils.stopActorGracefully(actorRef);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
index 5f59d59..f4d66d1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.retriever.impl;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -74,7 +74,7 @@ public class RpcGatewayRetrieverTest extends TestLogger {
                final UUID leaderSessionId = UUID.randomUUID();
 
                RpcGatewayRetriever<UUID, DummyGateway> gatewayRetriever = new 
RpcGatewayRetriever<>(rpcService, DummyGateway.class, Function.identity(), 0, 
Time.milliseconds(0L));
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               SettableLeaderRetrievalService settableLeaderRetrievalService = 
new SettableLeaderRetrievalService();
                DummyRpcEndpoint dummyRpcEndpoint = new 
DummyRpcEndpoint(rpcService, "dummyRpcEndpoint1", expectedValue);
                DummyRpcEndpoint dummyRpcEndpoint2 = new 
DummyRpcEndpoint(rpcService, "dummyRpcEndpoint2", expectedValue2);
                rpcService.registerGateway(dummyRpcEndpoint.getAddress(), 
dummyRpcEndpoint.getSelfGateway(DummyGateway.class));
@@ -84,13 +84,13 @@ public class RpcGatewayRetrieverTest extends TestLogger {
                        dummyRpcEndpoint.start();
                        dummyRpcEndpoint2.start();
 
-                       testingLeaderRetrievalService.start(gatewayRetriever);
+                       settableLeaderRetrievalService.start(gatewayRetriever);
 
                        final CompletableFuture<DummyGateway> gatewayFuture = 
gatewayRetriever.getFuture();
 
                        assertFalse(gatewayFuture.isDone());
 
-                       
testingLeaderRetrievalService.notifyListener(dummyRpcEndpoint.getAddress(), 
leaderSessionId);
+                       
settableLeaderRetrievalService.notifyListener(dummyRpcEndpoint.getAddress(), 
leaderSessionId);
 
                        final DummyGateway dummyGateway = 
gatewayFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 
@@ -98,7 +98,7 @@ public class RpcGatewayRetrieverTest extends TestLogger {
                        assertEquals(expectedValue, 
dummyGateway.foobar(TIMEOUT).get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS));
 
                        // elect a new leader
-                       
testingLeaderRetrievalService.notifyListener(dummyRpcEndpoint2.getAddress(), 
leaderSessionId);
+                       
settableLeaderRetrievalService.notifyListener(dummyRpcEndpoint2.getAddress(), 
leaderSessionId);
 
                        final CompletableFuture<DummyGateway> gatewayFuture2 = 
gatewayRetriever.getFuture();
                        final DummyGateway dummyGateway2 = 
gatewayFuture2.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index aea3a67..578e8e2 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
@@ -98,7 +98,7 @@ public class UtilsTest extends TestLogger {
 
                        Configuration flinkConfig = new Configuration();
                        YarnConfiguration yarnConfig = new YarnConfiguration();
-                       TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       SettableLeaderRetrievalService leaderRetrievalService = 
new SettableLeaderRetrievalService(
                                null,
                                null);
                        String applicationMasterHostName = "localhost";

Reply via email to