This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6b40ff1f384c5a2253c8393c3612d3384ae6bfc5 Author: Till Rohrmann <[email protected]> AuthorDate: Thu Apr 1 14:12:47 2021 +0200 [FLINK-21942] Extract JobLeaderIdService interface to make the ResourceManager better testable --- ...Service.java => DefaultJobLeaderIdService.java} | 50 +--- .../resourcemanager/JobLeaderIdActions.java | 2 +- .../resourcemanager/JobLeaderIdService.java | 324 ++------------------- .../ResourceManagerRuntimeServices.java | 2 +- ...est.java => DefaultJobLeaderIdServiceTest.java} | 18 +- .../ResourceManagerJobMasterTest.java | 2 +- .../ResourceManagerPartitionLifecycleTest.java | 2 +- .../ResourceManagerTaskExecutorTest.java | 2 +- .../resourcemanager/ResourceManagerTest.java | 4 +- .../utils/MockResourceManagerRuntimeServices.java | 3 +- 10 files changed, 52 insertions(+), 357 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java similarity index 90% copy from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java index 5d89d10..b499d7f 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java @@ -48,9 +48,9 @@ import java.util.concurrent.TimeUnit; * #getLeaderId(JobID)}. The future will only be completed with an exception in case the service * will be stopped. */ -public class JobLeaderIdService { +public class DefaultJobLeaderIdService implements JobLeaderIdService { - private static final Logger LOG = LoggerFactory.getLogger(JobLeaderIdService.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultJobLeaderIdService.class); /** High availability services to use by this service. */ private final HighAvailabilityServices highAvailabilityServices; @@ -65,7 +65,7 @@ public class JobLeaderIdService { /** Actions to call when the job leader changes. */ private JobLeaderIdActions jobLeaderIdActions; - public JobLeaderIdService( + public DefaultJobLeaderIdService( HighAvailabilityServices highAvailabilityServices, ScheduledExecutor scheduledExecutor, Time jobTimeout) { @@ -79,12 +79,7 @@ public class JobLeaderIdService { jobLeaderIdActions = null; } - /** - * Start the service with the given job leader actions. - * - * @param initialJobLeaderIdActions to use for job leader id actions - * @throws Exception which is thrown when clearing up old state - */ + @Override public void start(JobLeaderIdActions initialJobLeaderIdActions) throws Exception { if (isStarted()) { clear(); @@ -93,11 +88,7 @@ public class JobLeaderIdService { this.jobLeaderIdActions = Preconditions.checkNotNull(initialJobLeaderIdActions); } - /** - * Stop the service. - * - * @throws Exception which is thrown in case a retrieval service cannot be stopped properly - */ + @Override public void stop() throws Exception { clear(); @@ -113,11 +104,7 @@ public class JobLeaderIdService { return jobLeaderIdActions != null; } - /** - * Stop and clear the currently registered job leader id listeners. - * - * @throws Exception which is thrown in case a retrieval service cannot be stopped properly - */ + @Override public void clear() throws Exception { Exception exception = null; @@ -133,19 +120,14 @@ public class JobLeaderIdService { ExceptionUtils.rethrowException( exception, "Could not properly stop the " - + JobLeaderIdService.class.getSimpleName() + + DefaultJobLeaderIdService.class.getSimpleName() + '.'); } jobLeaderIdListeners.clear(); } - /** - * Add a job to be monitored to retrieve the job leader id. - * - * @param jobId identifying the job to monitor - * @throws Exception if the job could not be added to the service - */ + @Override public void addJob(JobID jobId) throws Exception { Preconditions.checkNotNull(jobLeaderIdActions); @@ -161,12 +143,7 @@ public class JobLeaderIdService { } } - /** - * Remove the given job from being monitored by the service. - * - * @param jobId identifying the job to remove from monitor - * @throws Exception if removing the job fails - */ + @Override public void removeJob(JobID jobId) throws Exception { LOG.debug("Remove job {} from job leader id monitoring.", jobId); @@ -177,16 +154,12 @@ public class JobLeaderIdService { } } - /** - * Check whether the given job is being monitored or not. - * - * @param jobId identifying the job - * @return True if the job is being monitored; otherwise false - */ + @Override public boolean containsJob(JobID jobId) { return jobLeaderIdListeners.containsKey(jobId); } + @Override public CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws Exception { if (!jobLeaderIdListeners.containsKey(jobId)) { addJob(jobId); @@ -197,6 +170,7 @@ public class JobLeaderIdService { return listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull); } + @Override public boolean isValidTimeout(JobID jobId, UUID timeoutId) { JobLeaderIdListener jobLeaderIdListener = jobLeaderIdListeners.get(jobId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java index e0418af..2a2a248 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import java.util.UUID; -/** Interface for actions called by the {@link JobLeaderIdService}. */ +/** Interface for actions called by the {@link DefaultJobLeaderIdService}. */ public interface JobLeaderIdActions { /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java old mode 100755 new mode 100644 index 5d89d10..1c926ae --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,28 +19,10 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterId; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; /** * Service which retrieves for a registered job the current job leader id (the leader id of the job @@ -48,36 +30,7 @@ import java.util.concurrent.TimeUnit; * #getLeaderId(JobID)}. The future will only be completed with an exception in case the service * will be stopped. */ -public class JobLeaderIdService { - - private static final Logger LOG = LoggerFactory.getLogger(JobLeaderIdService.class); - - /** High availability services to use by this service. */ - private final HighAvailabilityServices highAvailabilityServices; - - private final ScheduledExecutor scheduledExecutor; - - private final Time jobTimeout; - - /** Map of currently monitored jobs. */ - private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners; - - /** Actions to call when the job leader changes. */ - private JobLeaderIdActions jobLeaderIdActions; - - public JobLeaderIdService( - HighAvailabilityServices highAvailabilityServices, - ScheduledExecutor scheduledExecutor, - Time jobTimeout) { - this.highAvailabilityServices = - Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices"); - this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor"); - this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout"); - - jobLeaderIdListeners = new HashMap<>(4); - - jobLeaderIdActions = null; - } +public interface JobLeaderIdService { /** * Start the service with the given job leader actions. @@ -85,60 +38,21 @@ public class JobLeaderIdService { * @param initialJobLeaderIdActions to use for job leader id actions * @throws Exception which is thrown when clearing up old state */ - public void start(JobLeaderIdActions initialJobLeaderIdActions) throws Exception { - if (isStarted()) { - clear(); - } - - this.jobLeaderIdActions = Preconditions.checkNotNull(initialJobLeaderIdActions); - } + void start(JobLeaderIdActions initialJobLeaderIdActions) throws Exception; /** * Stop the service. * * @throws Exception which is thrown in case a retrieval service cannot be stopped properly */ - public void stop() throws Exception { - clear(); - - this.jobLeaderIdActions = null; - } - - /** - * Checks whether the service has been started. - * - * @return True if the service has been started; otherwise false - */ - public boolean isStarted() { - return jobLeaderIdActions != null; - } + void stop() throws Exception; /** * Stop and clear the currently registered job leader id listeners. * * @throws Exception which is thrown in case a retrieval service cannot be stopped properly */ - public void clear() throws Exception { - Exception exception = null; - - for (JobLeaderIdListener listener : jobLeaderIdListeners.values()) { - try { - listener.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } - - if (exception != null) { - ExceptionUtils.rethrowException( - exception, - "Could not properly stop the " - + JobLeaderIdService.class.getSimpleName() - + '.'); - } - - jobLeaderIdListeners.clear(); - } + void clear() throws Exception; /** * Add a job to be monitored to retrieve the job leader id. @@ -146,20 +60,7 @@ public class JobLeaderIdService { * @param jobId identifying the job to monitor * @throws Exception if the job could not be added to the service */ - public void addJob(JobID jobId) throws Exception { - Preconditions.checkNotNull(jobLeaderIdActions); - - LOG.debug("Add job {} to job leader id monitoring.", jobId); - - if (!jobLeaderIdListeners.containsKey(jobId)) { - LeaderRetrievalService leaderRetrievalService = - highAvailabilityServices.getJobManagerLeaderRetriever(jobId); - - JobLeaderIdListener jobIdListener = - new JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService); - jobLeaderIdListeners.put(jobId, jobIdListener); - } - } + void addJob(JobID jobId) throws Exception; /** * Remove the given job from being monitored by the service. @@ -167,15 +68,7 @@ public class JobLeaderIdService { * @param jobId identifying the job to remove from monitor * @throws Exception if removing the job fails */ - public void removeJob(JobID jobId) throws Exception { - LOG.debug("Remove job {} from job leader id monitoring.", jobId); - - JobLeaderIdListener listener = jobLeaderIdListeners.remove(jobId); - - if (listener != null) { - listener.stop(); - } - } + void removeJob(JobID jobId) throws Exception; /** * Check whether the given job is being monitored or not. @@ -183,196 +76,23 @@ public class JobLeaderIdService { * @param jobId identifying the job * @return True if the job is being monitored; otherwise false */ - public boolean containsJob(JobID jobId) { - return jobLeaderIdListeners.containsKey(jobId); - } - - public CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws Exception { - if (!jobLeaderIdListeners.containsKey(jobId)) { - addJob(jobId); - } - - JobLeaderIdListener listener = jobLeaderIdListeners.get(jobId); - - return listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull); - } - - public boolean isValidTimeout(JobID jobId, UUID timeoutId) { - JobLeaderIdListener jobLeaderIdListener = jobLeaderIdListeners.get(jobId); - - if (null != jobLeaderIdListener) { - return Objects.equals(timeoutId, jobLeaderIdListener.getTimeoutId()); - } else { - return false; - } - } - - // -------------------------------------------------------------------------------- - // Static utility classes - // -------------------------------------------------------------------------------- + boolean containsJob(JobID jobId); /** - * Listener which stores the current leader id and exposes them as a future value when - * requested. The returned future will always be completed properly except when stopping the - * listener. + * Get the leader's {@link JobMasterId} future for the given job. + * + * @param jobId jobId specifying for which job to retrieve the {@link JobMasterId} + * @return Future with the current leader's {@link JobMasterId} + * @throws Exception if retrieving the {@link JobMasterId} cannot be started */ - private final class JobLeaderIdListener implements LeaderRetrievalListener { - private final Object timeoutLock = new Object(); - private final JobID jobId; - private final JobLeaderIdActions listenerJobLeaderIdActions; - private final LeaderRetrievalService leaderRetrievalService; - - private volatile CompletableFuture<UUID> leaderIdFuture; - private volatile boolean running = true; - - /** Null if no timeout has been scheduled; otherwise non null. */ - @Nullable private volatile ScheduledFuture<?> timeoutFuture; - - /** Null if no timeout has been scheduled; otherwise non null. */ - @Nullable private volatile UUID timeoutId; - - private JobLeaderIdListener( - JobID jobId, - JobLeaderIdActions listenerJobLeaderIdActions, - LeaderRetrievalService leaderRetrievalService) - throws Exception { - this.jobId = Preconditions.checkNotNull(jobId); - this.listenerJobLeaderIdActions = - Preconditions.checkNotNull(listenerJobLeaderIdActions); - this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService); - - leaderIdFuture = new CompletableFuture<>(); - - activateTimeout(); - - // start the leader service we're listening to - leaderRetrievalService.start(this); - } - - public CompletableFuture<UUID> getLeaderIdFuture() { - return leaderIdFuture; - } - - @Nullable - public UUID getTimeoutId() { - return timeoutId; - } - - public void stop() throws Exception { - running = false; - leaderRetrievalService.stop(); - cancelTimeout(); - leaderIdFuture.completeExceptionally( - new Exception("Job leader id service has been stopped.")); - } + CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws Exception; - @Override - public void notifyLeaderAddress( - @Nullable String leaderAddress, @Nullable UUID leaderSessionId) { - if (running) { - UUID previousJobLeaderId = null; - - if (leaderIdFuture.isDone()) { - try { - previousJobLeaderId = leaderIdFuture.getNow(null); - } catch (CompletionException e) { - // this should never happen since we complete this future always properly - handleError(e); - } - - if (leaderSessionId == null) { - // there was a leader, but we no longer have one - LOG.debug("Job {} no longer has a job leader.", jobId); - leaderIdFuture = new CompletableFuture<>(); - } else { - // there was an active leader, but we now have a new leader - LOG.debug( - "Job {} has a new job leader {}@{}.", - jobId, - leaderSessionId, - leaderAddress); - leaderIdFuture = CompletableFuture.completedFuture(leaderSessionId); - } - } else { - if (leaderSessionId != null) { - // there was no active leader, but we now have a new leader - LOG.debug( - "Job {} has a new job leader {}@{}.", - jobId, - leaderSessionId, - leaderAddress); - leaderIdFuture.complete(leaderSessionId); - } - } - - if (previousJobLeaderId != null && !previousJobLeaderId.equals(leaderSessionId)) { - // we had a previous job leader, so notify about his lost leadership - listenerJobLeaderIdActions.jobLeaderLostLeadership( - jobId, new JobMasterId(previousJobLeaderId)); - - if (null == leaderSessionId) { - // No current leader active ==> Set a timeout for the job - activateTimeout(); - - // check if we got stopped asynchronously - if (!running) { - cancelTimeout(); - } - } - } else if (null != leaderSessionId) { - // Cancel timeout because we've found an active leader for it - cancelTimeout(); - } - } else { - LOG.debug( - "A leader id change {}@{} has been detected after the listener has been stopped.", - leaderSessionId, - leaderAddress); - } - } - - @Override - public void handleError(Exception exception) { - if (running) { - listenerJobLeaderIdActions.handleError(exception); - } else { - LOG.debug( - "An error occurred in the {} after the listener has been stopped.", - JobLeaderIdListener.class.getSimpleName(), - exception); - } - } - - private void activateTimeout() { - synchronized (timeoutLock) { - cancelTimeout(); - - final UUID newTimeoutId = UUID.randomUUID(); - - timeoutId = newTimeoutId; - timeoutFuture = - scheduledExecutor.schedule( - new Runnable() { - @Override - public void run() { - listenerJobLeaderIdActions.notifyJobTimeout( - jobId, newTimeoutId); - } - }, - jobTimeout.toMilliseconds(), - TimeUnit.MILLISECONDS); - } - } - - private void cancelTimeout() { - synchronized (timeoutLock) { - if (timeoutFuture != null) { - timeoutFuture.cancel(true); - } - - timeoutFuture = null; - timeoutId = null; - } - } - } + /** + * Checks whether the given timeoutId for the given jobId is valid or not. + * + * @param jobId jobId identifying the job for which the timeout should be checked + * @param timeoutId timeoutId specifying the timeout which should be checked for its validity + * @return {@code true} if the timeout is valid; otherwise {@code false} + */ + boolean isValidTimeout(JobID jobId, UUID timeoutId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java index 00129f2..1ce6d83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java @@ -70,7 +70,7 @@ public class ResourceManagerRuntimeServices { createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup); final JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService( + new DefaultJobLeaderIdService( highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout()); return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java similarity index 95% rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java index 97f04da..7f3cd0a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java @@ -58,8 +58,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -/** Tests for the {@link JobLeaderIdService}. */ -public class JobLeaderIdServiceTest extends TestLogger { +/** Tests for the {@link DefaultJobLeaderIdService}. */ +public class DefaultJobLeaderIdServiceTest extends TestLogger { /** Tests adding a job and finding out its leader id. */ @Test(timeout = 10000) @@ -79,7 +79,7 @@ public class JobLeaderIdServiceTest extends TestLogger { JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); + new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); jobLeaderIdService.start(jobLeaderIdActions); @@ -111,7 +111,7 @@ public class JobLeaderIdServiceTest extends TestLogger { JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); + new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); jobLeaderIdService.start(jobLeaderIdActions); @@ -152,7 +152,7 @@ public class JobLeaderIdServiceTest extends TestLogger { JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); + new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); jobLeaderIdService.start(jobLeaderIdActions); @@ -227,7 +227,7 @@ public class JobLeaderIdServiceTest extends TestLogger { .notifyJobTimeout(eq(jobId), any(UUID.class)); JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); + new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); jobLeaderIdService.start(jobLeaderIdActions); @@ -295,7 +295,7 @@ public class JobLeaderIdServiceTest extends TestLogger { highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService( + new DefaultJobLeaderIdService( highAvailabilityServices, new ManuallyTriggeredScheduledExecutor(), Time.milliseconds(5000L)); @@ -332,8 +332,8 @@ public class JobLeaderIdServiceTest extends TestLogger { ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); Time timeout = Time.milliseconds(5000L); JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); - JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); + DefaultJobLeaderIdService jobLeaderIdService = + new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); assertFalse(jobLeaderIdService.isStarted()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 1639feb..1c9eb17 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -135,7 +135,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService( + new DefaultJobLeaderIdService( haServices, rpcService.getScheduledExecutor(), Time.minutes(5L)); final SlotManager slotManager = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java index f950a60..3853b1c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java @@ -232,7 +232,7 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger { .setScheduledExecutor(rpcService.getScheduledExecutor()) .build(); final JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService( + new DefaultJobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 9720395..9223bbe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -161,7 +161,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { .build(); JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService( + new DefaultJobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), Time.minutes(5L)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 9cfb526..b6971ef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -322,7 +322,7 @@ public class ResourceManagerTest extends TestLogger { rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); final JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService( + new DefaultJobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime()); @@ -372,7 +372,7 @@ public class ResourceManagerTest extends TestLogger { private TestingResourceManager createAndStartResourceManager( HeartbeatServices heartbeatServices) throws Exception { final JobLeaderIdService jobLeaderIdService = - new JobLeaderIdService( + new DefaultJobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java index b72217c..fd8920b 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.resourcemanager.DefaultJobLeaderIdService; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder; @@ -70,7 +71,7 @@ public class MockResourceManagerRuntimeServices { highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); heartbeatServices = new TestingHeartbeatServices(); jobLeaderIdService = - new JobLeaderIdService( + new DefaultJobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), Time.minutes(5L));
