Repository: flink Updated Branches: refs/heads/master f9a583b72 -> 6fba46e6a
[FLINK-8776][flip6] Use correct port for job submission from Web UI. Use address of local WebMonitorEndpoint for the job submission from the Web UI. Rename TestingLeaderRetrievalService to SettableLeaderRetrievalService and move class out of test directory. This closes #5577. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbb63531 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbb63531 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbb63531 Branch: refs/heads/master Commit: bbb63531b2118dfde1584a7f61ee908d1188698d Parents: f9a583b Author: gyao <[email protected]> Authored: Sun Feb 25 12:53:53 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Mon Feb 26 10:51:30 2018 +0100 ---------------------------------------------------------------------- .../client/program/rest/RestClusterClient.java | 28 +++++++- .../client/program/ClientConnectionTest.java | 6 +- .../program/rest/RestClusterClientTest.java | 7 +- .../MesosFlinkResourceManagerTest.java | 4 +- .../MesosResourceManagerTest.java | 6 +- .../webmonitor/WebSubmissionExtension.java | 12 +++- .../SettableLeaderRetrievalService.java | 73 ++++++++++++++++++++ .../runtime/client/JobClientActorTest.java | 30 ++++---- .../clusterframework/ResourceManagerTest.java | 8 +-- .../runtime/dispatcher/DispatcherTest.java | 4 +- .../highavailability/ManualLeaderService.java | 14 ++-- .../jobmanager/JobManagerHARecoveryTest.java | 4 +- .../runtime/jobmaster/JobManagerRunnerTest.java | 4 +- .../flink/runtime/jobmaster/JobMasterTest.java | 6 +- .../TestingLeaderRetrievalService.java | 69 ------------------ .../SettableLeaderRetrievalServiceTest.java | 67 ++++++++++++++++++ .../resourcemanager/JobLeaderIdServiceTest.java | 10 +-- .../ResourceManagerJobMasterTest.java | 12 ++-- .../taskexecutor/TaskExecutorITCase.java | 6 +- .../runtime/taskexecutor/TaskExecutorTest.java | 10 +-- .../TaskManagerRegistrationTest.java | 13 ++-- .../runtime/taskmanager/TaskManagerTest.java | 4 +- .../retriever/LeaderGatewayRetrieverTest.java | 8 +-- .../impl/AkkaJobManagerRetrieverTest.java | 10 +-- .../retriever/impl/RpcGatewayRetrieverTest.java | 10 +-- .../java/org/apache/flink/yarn/UtilsTest.java | 4 +- 26 files changed, 268 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 8ad571f..3a377f3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -142,11 +142,29 @@ public class RestClusterClient<T> extends ClusterClient<T> { config, null, clusterId, - new ExponentialWaitStrategy(10L, 2000L)); + new ExponentialWaitStrategy(10L, 2000L), + null); + } + + public RestClusterClient( + Configuration config, + T clusterId, + LeaderRetrievalService webMonitorRetrievalService) throws Exception { + this( + config, + null, + clusterId, + new ExponentialWaitStrategy(10L, 2000L), + webMonitorRetrievalService); } @VisibleForTesting - RestClusterClient(Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy) throws Exception { + RestClusterClient( + Configuration configuration, + @Nullable RestClient restClient, + T clusterId, + WaitStrategy waitStrategy, + @Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception { super(configuration); this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration); @@ -159,7 +177,11 @@ public class RestClusterClient<T> extends ClusterClient<T> { this.waitStrategy = Preconditions.checkNotNull(waitStrategy); this.clusterId = Preconditions.checkNotNull(clusterId); - this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever(); + if (webMonitorRetrievalService == null) { + this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever(); + } else { + this.webMonitorRetrievalService = webMonitorRetrievalService; + } this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry")); startLeaderRetrievers(); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java index 72767c7..1dd4787 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java @@ -27,8 +27,8 @@ import org.apache.flink.runtime.client.JobClientActorTest; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; @@ -136,9 +136,9 @@ public class ClientConnectionTest extends TestLogger { final String expectedAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef); - final TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(expectedAddress, leaderId); + final SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(expectedAddress, leaderId); - highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService); + highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, settableLeaderRetrievalService); StandaloneClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index f587e33..ca2ba22 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -182,7 +182,12 @@ public class RestClusterClientTest extends TestLogger { } } }; - restClusterClient = new RestClusterClient<>(config, restClient, StandaloneClusterId.getInstance(), (attempt) -> 0); + restClusterClient = new RestClusterClient<>( + config, + restClient, + StandaloneClusterId.getInstance(), + (attempt) -> 0, + null); jobGraph = new JobGraph("testjob"); jobId = jobGraph.getJobID(); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java index 56735ef..330a2c6 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java @@ -50,8 +50,8 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; @@ -213,7 +213,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger { highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, - new TestingLeaderRetrievalService( + new SettableLeaderRetrievalService( jobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID)); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 2b38b85..412e18d 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -53,7 +53,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -412,7 +412,7 @@ public class MesosResourceManagerTest extends TestLogger { public final String address; public final JobMasterGateway gateway; public final JobMasterId jobMasterId; - public final TestingLeaderRetrievalService leaderRetrievalService; + public final SettableLeaderRetrievalService leaderRetrievalService; MockJobMaster(JobID jobID) { this.jobID = jobID; @@ -420,7 +420,7 @@ public class MesosResourceManagerTest extends TestLogger { this.address = "/" + jobID; this.gateway = mock(JobMasterGateway.class); this.jobMasterId = JobMasterId.generate(); - this.leaderRetrievalService = new TestingLeaderRetrievalService(this.address, this.jobMasterId.toUUID()); + this.leaderRetrievalService = new SettableLeaderRetrievalService(this.address, this.jobMasterId.toUUID()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java index e0ac250..df36483 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler; import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders; @@ -62,7 +64,15 @@ public class WebSubmissionExtension implements WebMonitorExtension { Executor executor, Time timeout) throws Exception { - restClusterClient = new RestClusterClient<>(configuration, "WebSubmissionHandlers"); + final SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(); + restAddressFuture.thenAccept(restAddress -> settableLeaderRetrievalService.notifyListener( + restAddress, + HighAvailabilityServices.DEFAULT_LEADER_ID)); + + restClusterClient = new RestClusterClient<>( + configuration, + "WebSubmissionHandlers", + settableLeaderRetrievalService); webSubmissionHandlers = new ArrayList<>(3); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalService.java new file mode 100644 index 0000000..1f2711a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalService.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.UUID; + +/** + * {@link LeaderRetrievalService} implementation which directly forwards calls of + * notifyListener to the listener. + */ +public class SettableLeaderRetrievalService implements LeaderRetrievalService { + + private String leaderAddress; + private UUID leaderSessionID; + + private LeaderRetrievalListener listener; + + public SettableLeaderRetrievalService() { + this(null, null); + } + + public SettableLeaderRetrievalService( + @Nullable String leaderAddress, + @Nullable UUID leaderSessionID) { + this.leaderAddress = leaderAddress; + this.leaderSessionID = leaderSessionID; + } + + @Override + public synchronized void start(LeaderRetrievalListener listener) throws Exception { + this.listener = Preconditions.checkNotNull(listener); + + if (leaderSessionID != null && leaderAddress != null) { + listener.notifyLeaderAddress(leaderAddress, leaderSessionID); + } + } + + @Override + public void stop() throws Exception { + + } + + public synchronized void notifyListener( + @Nullable String address, + @Nullable UUID leaderSessionID) { + this.leaderAddress = address; + this.leaderSessionID = leaderSessionID; + + if (listener != null) { + listener.notifyLeaderAddress(address, leaderSessionID); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java index 919a784..9050d12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobClientMessages; import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait; @@ -85,13 +85,13 @@ public class JobClientActorTest extends TestLogger { PlainActor.class, leaderSessionID)); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( jobManager.path().toString(), leaderSessionID ); Props jobClientActorProps = JobSubmissionClientActor.createActorProps( - testingLeaderRetrievalService, + settableLeaderRetrievalService, jobClientActorTimeout, false, clientConfig); @@ -124,13 +124,13 @@ public class JobClientActorTest extends TestLogger { PlainActor.class, leaderSessionID)); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( jobManager.path().toString(), leaderSessionID ); Props jobClientActorProps = JobAttachmentClientActor.createActorProps( - testingLeaderRetrievalService, + settableLeaderRetrievalService, jobClientActorTimeout, false); @@ -154,12 +154,12 @@ public class JobClientActorTest extends TestLogger { FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( "localhost", HighAvailabilityServices.DEFAULT_LEADER_ID); Props jobClientActorProps = JobSubmissionClientActor.createActorProps( - testingLeaderRetrievalService, + settableLeaderRetrievalService, jobClientActorTimeout, false, clientConfig); @@ -183,12 +183,12 @@ public class JobClientActorTest extends TestLogger { FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( "localhost", HighAvailabilityServices.DEFAULT_LEADER_ID); Props jobClientActorProps = JobAttachmentClientActor.createActorProps( - testingLeaderRetrievalService, + settableLeaderRetrievalService, jobClientActorTimeout, false); @@ -219,13 +219,13 @@ public class JobClientActorTest extends TestLogger { JobAcceptingActor.class, leaderSessionID)); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( jobManager.path().toString(), leaderSessionID ); Props jobClientActorProps = JobSubmissionClientActor.createActorProps( - testingLeaderRetrievalService, + settableLeaderRetrievalService, jobClientActorTimeout, false, clientConfig); @@ -261,13 +261,13 @@ public class JobClientActorTest extends TestLogger { JobAcceptingActor.class, leaderSessionID)); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( jobManager.path().toString(), leaderSessionID ); Props jobClientActorProps = JobAttachmentClientActor.createActorProps( - testingLeaderRetrievalService, + settableLeaderRetrievalService, jobClientActorTimeout, false); @@ -302,13 +302,13 @@ public class JobClientActorTest extends TestLogger { JobAcceptingActor.class, leaderSessionID)); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( jobManager.path().toString(), leaderSessionID ); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService); + highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, settableLeaderRetrievalService); JobListeningContext jobListeningContext = JobClient.submitJob( http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 241da8f..a1b3fb6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistryImpl; @@ -107,7 +107,7 @@ public class ResourceManagerTest extends TestLogger { private final Time timeout = Time.seconds(10L); private TestingHighAvailabilityServices highAvailabilityServices; - private TestingLeaderRetrievalService jobManagerLeaderRetrievalService; + private SettableLeaderRetrievalService jobManagerLeaderRetrievalService; @BeforeClass public static void setup() { @@ -121,7 +121,7 @@ public class ResourceManagerTest extends TestLogger { @Before public void setupTest() { - jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(); + jobManagerLeaderRetrievalService = new SettableLeaderRetrievalService(); highAvailabilityServices = new TestingHighAvailabilityServices(); @@ -602,7 +602,7 @@ public class ResourceManagerTest extends TestLogger { Time.seconds(5L)); final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); - final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); + final SettableLeaderRetrievalService jmLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); highAvailabilityServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 5d264e2..8267921 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -161,7 +161,7 @@ public class DispatcherTest extends TestLogger { haServices.setSubmittedJobGraphStore(submittedJobGraphStore); haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService); haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); - haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService()); + haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); runningJobsRegistry = haServices.getRunningJobsRegistry(); final Configuration blobServerConfig = new Configuration(); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java index 423c0ce..a055edf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.util.Preconditions; @@ -32,13 +32,13 @@ import java.util.UUID; /** * Leader service for {@link TestingManualHighAvailabilityServices} implementation. The leader * service allows to create multiple {@link TestingLeaderElectionService} and - * {@link TestingLeaderRetrievalService} and allows to manually trigger the services identified + * {@link SettableLeaderRetrievalService} and allows to manually trigger the services identified * by a continuous index. */ public class ManualLeaderService { private final List<TestingLeaderElectionService> leaderElectionServices; - private final List<TestingLeaderRetrievalService> leaderRetrievalServices; + private final List<SettableLeaderRetrievalService> leaderRetrievalServices; private int currentLeaderIndex; @@ -54,13 +54,13 @@ public class ManualLeaderService { } public LeaderRetrievalService createLeaderRetrievalService() { - final TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + final SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( getLeaderAddress(currentLeaderIndex), currentLeaderId); - leaderRetrievalServices.add(testingLeaderRetrievalService); + leaderRetrievalServices.add(settableLeaderRetrievalService); - return testingLeaderRetrievalService; + return settableLeaderRetrievalService; } public LeaderElectionService createLeaderElectionService() { @@ -100,7 +100,7 @@ public class ManualLeaderService { } public void notifyRetrievers(int index, UUID leaderId) { - for (TestingLeaderRetrievalService retrievalService: leaderRetrievalServices) { + for (SettableLeaderRetrievalService retrievalService: leaderRetrievalServices) { retrievalService.notifyListener(getLeaderAddress(index), leaderId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 005dd98..309ac12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -64,7 +64,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; @@ -184,7 +184,7 @@ public class JobManagerHARecoveryTest extends TestLogger { CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter(); CheckpointRecoveryFactory checkpointStateFactory = new TestingCheckpointRecoveryFactory(checkpointStore, checkpointCounter); TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService myLeaderRetrievalService = new SettableLeaderRetrievalService( null, null); TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices(); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java index 0c238ce..9730dde 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; @@ -113,7 +113,7 @@ public class JobManagerRunnerTest extends TestLogger { public void setup() { haServices = new TestingHighAvailabilityServices(); haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new TestingLeaderElectionService()); - haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService()); + haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); } http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e401020..b543056 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -45,7 +45,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; @@ -113,7 +113,7 @@ public class JobMasterTest extends TestLogger { private TestingHighAvailabilityServices haServices; - private TestingLeaderRetrievalService rmLeaderRetrievalService; + private SettableLeaderRetrievalService rmLeaderRetrievalService; private TestingFatalErrorHandler testingFatalErrorHandler; @@ -135,7 +135,7 @@ public class JobMasterTest extends TestLogger { haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); - rmLeaderRetrievalService = new TestingLeaderRetrievalService( + rmLeaderRetrievalService = new SettableLeaderRetrievalService( null, null); haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java deleted file mode 100644 index 15d3bde..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.leaderelection; - -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.util.Preconditions; - -import java.util.UUID; - -/** - * Test {@link LeaderRetrievalService} implementation which directly forwards calls of - * notifyListener to the listener. - */ -public class TestingLeaderRetrievalService implements LeaderRetrievalService { - - private volatile String leaderAddress; - private volatile UUID leaderSessionID; - - private volatile LeaderRetrievalListener listener; - - public TestingLeaderRetrievalService() { - this(null, null); - } - - public TestingLeaderRetrievalService(String leaderAddress, UUID leaderSessionID) { - this.leaderAddress = leaderAddress; - this.leaderSessionID = leaderSessionID; - } - - @Override - public void start(LeaderRetrievalListener listener) throws Exception { - this.listener = Preconditions.checkNotNull(listener); - - if (leaderSessionID != null && leaderAddress != null) { - listener.notifyLeaderAddress(leaderAddress, leaderSessionID); - } - } - - @Override - public void stop() throws Exception { - - } - - public void notifyListener(String address, UUID leaderSessionID) { - this.leaderAddress = address; - this.leaderSessionID = leaderSessionID; - - if (listener != null) { - listener.notifyLeaderAddress(address, leaderSessionID); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java new file mode 100644 index 0000000..667008b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingListener; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link SettableLeaderRetrievalService}. + */ +public class SettableLeaderRetrievalServiceTest extends TestLogger { + + private SettableLeaderRetrievalService settableLeaderRetrievalService; + + @Before + public void setUp() { + settableLeaderRetrievalService = new SettableLeaderRetrievalService(); + } + + @Test + public void testNotifyListenerLater() throws Exception { + final String localhost = "localhost"; + settableLeaderRetrievalService.notifyListener(localhost, HighAvailabilityServices.DEFAULT_LEADER_ID); + + final TestingListener listener = new TestingListener(); + settableLeaderRetrievalService.start(listener); + + assertThat(listener.getAddress(), equalTo(localhost)); + assertThat(listener.getLeaderSessionID(), equalTo(HighAvailabilityServices.DEFAULT_LEADER_ID)); + } + + @Test + public void testNotifyListenerImmediately() throws Exception { + final TestingListener listener = new TestingListener(); + settableLeaderRetrievalService.start(listener); + + final String localhost = "localhost"; + settableLeaderRetrievalService.notifyListener(localhost, HighAvailabilityServices.DEFAULT_LEADER_ID); + + assertThat(listener.getAddress(), equalTo(localhost)); + assertThat(listener.getLeaderSessionID(), equalTo(HighAvailabilityServices.DEFAULT_LEADER_ID)); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java index bb99a0c..301ab46 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterId; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -68,7 +68,7 @@ public class JobLeaderIdServiceTest extends TestLogger { final String address = "foobar"; final JobMasterId leaderId = JobMasterId.generate(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService( null, null); @@ -104,7 +104,7 @@ public class JobLeaderIdServiceTest extends TestLogger { public void testRemovingJob() throws Exception { final JobID jobId = new JobID(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(null, null); + SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null); highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); @@ -145,7 +145,7 @@ public class JobLeaderIdServiceTest extends TestLogger { public void testInitialJobTimeout() throws Exception { final JobID jobId = new JobID(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService( null, null); @@ -189,7 +189,7 @@ public class JobLeaderIdServiceTest extends TestLogger { final String address = "foobar"; final JobMasterId leaderId = JobMasterId.generate(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService( null, null); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index acd8774..9854f5a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; @@ -86,7 +86,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { JobID jobID = mockJobMaster(jobMasterAddress); JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); + SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); @@ -119,7 +119,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { JobID jobID = mockJobMaster(jobMasterAddress); JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); + SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final ResourceManagerGateway wronglyFencedGateway = rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class) @@ -153,7 +153,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { String jobMasterAddress = "/jobMasterAddress1"; JobID jobID = mockJobMaster(jobMasterAddress); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService( "localhost", HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); @@ -187,7 +187,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { String jobMasterAddress = "/jobMasterAddress1"; JobID jobID = mockJobMaster(jobMasterAddress); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService( "localhost", HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); @@ -221,7 +221,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { String jobMasterAddress = "/jobMasterAddress1"; JobID jobID = mockJobMaster(jobMasterAddress); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService( "localhost", HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index d693f47..fc8337f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; @@ -100,7 +100,7 @@ public class TaskExecutorITCase extends TestLogger { final ResourceID taskManagerResourceId = new ResourceID("foobar"); final UUID rmLeaderId = UUID.randomUUID(); final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); - final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(null, null); + final SettableLeaderRetrievalService rmLeaderRetrievalService = new SettableLeaderRetrievalService(null, null); final String rmAddress = "rm"; final String jmAddress = "jm"; final JobMasterId jobMasterId = JobMasterId.generate(); @@ -111,7 +111,7 @@ public class TaskExecutorITCase extends TestLogger { testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); - testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jobMasterId.toUUID())); + testingHAServices.setJobMasterLeaderRetriever(jobId, new SettableLeaderRetrievalService(jmAddress, jobMasterId.toUUID())); TestingRpcService rpcService = new TestingRpcService(); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index e972feb..7aae287 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -59,7 +59,7 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -165,9 +165,9 @@ public class TaskExecutorTest extends TestLogger { private TestingHighAvailabilityServices haServices; - private TestingLeaderRetrievalService resourceManagerLeaderRetriever; + private SettableLeaderRetrievalService resourceManagerLeaderRetriever; - private TestingLeaderRetrievalService jobManagerLeaderRetriever; + private SettableLeaderRetrievalService jobManagerLeaderRetriever; @Before public void setup() throws IOException { @@ -188,8 +188,8 @@ public class TaskExecutorTest extends TestLogger { testingFatalErrorHandler = new TestingFatalErrorHandler(); haServices = new TestingHighAvailabilityServices(); - resourceManagerLeaderRetriever = new TestingLeaderRetrievalService(); - jobManagerLeaderRetriever = new TestingLeaderRetrievalService(); + resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(); + jobManagerLeaderRetriever = new SettableLeaderRetrievalService(); haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever); } http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index ceb92c4..6b65095 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -24,7 +24,6 @@ import akka.actor.InvalidActorNameException; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -38,7 +37,7 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; @@ -273,7 +272,7 @@ public class TaskManagerRegistrationTest extends TestLogger { highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, // Give a non-existent job manager address to the task manager - new TestingLeaderRetrievalService( + new SettableLeaderRetrievalService( "foobar", HighAvailabilityServices.DEFAULT_LEADER_ID)); @@ -330,7 +329,7 @@ public class TaskManagerRegistrationTest extends TestLogger { highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, - new TestingLeaderRetrievalService( + new SettableLeaderRetrievalService( jm.path(), HighAvailabilityServices.DEFAULT_LEADER_ID)); @@ -397,7 +396,7 @@ public class TaskManagerRegistrationTest extends TestLogger { highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, - new TestingLeaderRetrievalService( + new SettableLeaderRetrievalService( jm.path(), HighAvailabilityServices.DEFAULT_LEADER_ID)); @@ -496,13 +495,13 @@ public class TaskManagerRegistrationTest extends TestLogger { Option.apply(JOB_MANAGER_NAME)); final ActorGateway fakeJM1Gateway = fakeJobManager1Gateway; - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( fakeJM1Gateway.path(), HighAvailabilityServices.DEFAULT_LEADER_ID); highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, - testingLeaderRetrievalService); + settableLeaderRetrievalService); // we make the test actor (the test kit) the JobManager to intercept // the messages http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 9d41bfb..93ec3ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -53,7 +53,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.Tasks; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.RegistrationMessages; @@ -1529,7 +1529,7 @@ public class TaskManagerTest extends TestLogger { public void testTerminationOnFatalError() { highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, - new TestingLeaderRetrievalService()); + new SettableLeaderRetrievalService()); new JavaTestKit(system){{ http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java index 7be06f3..9fffc48 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.retriever; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -52,14 +52,14 @@ public class LeaderGatewayRetrieverTest extends TestLogger { RpcGateway rpcGateway = mock(RpcGateway.class); TestingLeaderGatewayRetriever leaderGatewayRetriever = new TestingLeaderGatewayRetriever(rpcGateway); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(); - testingLeaderRetrievalService.start(leaderGatewayRetriever); + settableLeaderRetrievalService.start(leaderGatewayRetriever); CompletableFuture<RpcGateway> gatewayFuture = leaderGatewayRetriever.getFuture(); // this triggers the first gateway retrieval attempt - testingLeaderRetrievalService.notifyListener(address, leaderId); + settableLeaderRetrievalService.notifyListener(address, leaderId); // check that the first future has been failed try { http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java index 5d01087..94473b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClientActorTest; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; @@ -70,7 +70,7 @@ public class AkkaJobManagerRetrieverTest extends TestLogger { @Test public void testAkkaJobManagerRetrieval() throws Exception { AkkaJobManagerRetriever akkaJobManagerRetriever = new AkkaJobManagerRetriever(actorSystem, timeout, 0, Time.milliseconds(0L)); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(); CompletableFuture<JobManagerGateway> gatewayFuture = akkaJobManagerRetriever.getFuture(); final UUID leaderSessionId = UUID.randomUUID(); @@ -83,18 +83,18 @@ public class AkkaJobManagerRetrieverTest extends TestLogger { final String address = actorRef.path().toString(); - testingLeaderRetrievalService.start(akkaJobManagerRetriever); + settableLeaderRetrievalService.start(akkaJobManagerRetriever); // check that the gateway future has not been completed since there is no leader yet assertFalse(gatewayFuture.isDone()); - testingLeaderRetrievalService.notifyListener(address, leaderSessionId); + settableLeaderRetrievalService.notifyListener(address, leaderSessionId); JobManagerGateway jobManagerGateway = gatewayFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertEquals(address, jobManagerGateway.getAddress()); } finally { - testingLeaderRetrievalService.stop(); + settableLeaderRetrievalService.stop(); if (actorRef != null) { TestingUtils.stopActorGracefully(actorRef); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java index 5f59d59..f4d66d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.retriever.impl; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -74,7 +74,7 @@ public class RpcGatewayRetrieverTest extends TestLogger { final UUID leaderSessionId = UUID.randomUUID(); RpcGatewayRetriever<UUID, DummyGateway> gatewayRetriever = new RpcGatewayRetriever<>(rpcService, DummyGateway.class, Function.identity(), 0, Time.milliseconds(0L)); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); + SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(); DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(rpcService, "dummyRpcEndpoint1", expectedValue); DummyRpcEndpoint dummyRpcEndpoint2 = new DummyRpcEndpoint(rpcService, "dummyRpcEndpoint2", expectedValue2); rpcService.registerGateway(dummyRpcEndpoint.getAddress(), dummyRpcEndpoint.getSelfGateway(DummyGateway.class)); @@ -84,13 +84,13 @@ public class RpcGatewayRetrieverTest extends TestLogger { dummyRpcEndpoint.start(); dummyRpcEndpoint2.start(); - testingLeaderRetrievalService.start(gatewayRetriever); + settableLeaderRetrievalService.start(gatewayRetriever); final CompletableFuture<DummyGateway> gatewayFuture = gatewayRetriever.getFuture(); assertFalse(gatewayFuture.isDone()); - testingLeaderRetrievalService.notifyListener(dummyRpcEndpoint.getAddress(), leaderSessionId); + settableLeaderRetrievalService.notifyListener(dummyRpcEndpoint.getAddress(), leaderSessionId); final DummyGateway dummyGateway = gatewayFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -98,7 +98,7 @@ public class RpcGatewayRetrieverTest extends TestLogger { assertEquals(expectedValue, dummyGateway.foobar(TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)); // elect a new leader - testingLeaderRetrievalService.notifyListener(dummyRpcEndpoint2.getAddress(), leaderSessionId); + settableLeaderRetrievalService.notifyListener(dummyRpcEndpoint2.getAddress(), leaderSessionId); final CompletableFuture<DummyGateway> gatewayFuture2 = gatewayRetriever.getFuture(); final DummyGateway dummyGateway2 = gatewayFuture2.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/flink/blob/bbb63531/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index aea3a67..578e8e2 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -25,7 +25,7 @@ import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; @@ -98,7 +98,7 @@ public class UtilsTest extends TestLogger { Configuration flinkConfig = new Configuration(); YarnConfiguration yarnConfig = new YarnConfiguration(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService( null, null); String applicationMasterHostName = "localhost";
