This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b40ff1f384c5a2253c8393c3612d3384ae6bfc5
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Apr 1 14:12:47 2021 +0200

    [FLINK-21942] Extract JobLeaderIdService interface to make the 
ResourceManager better testable
---
 ...Service.java => DefaultJobLeaderIdService.java} |  50 +---
 .../resourcemanager/JobLeaderIdActions.java        |   2 +-
 .../resourcemanager/JobLeaderIdService.java        | 324 ++-------------------
 .../ResourceManagerRuntimeServices.java            |   2 +-
 ...est.java => DefaultJobLeaderIdServiceTest.java} |  18 +-
 .../ResourceManagerJobMasterTest.java              |   2 +-
 .../ResourceManagerPartitionLifecycleTest.java     |   2 +-
 .../ResourceManagerTaskExecutorTest.java           |   2 +-
 .../resourcemanager/ResourceManagerTest.java       |   4 +-
 .../utils/MockResourceManagerRuntimeServices.java  |   3 +-
 10 files changed, 52 insertions(+), 357 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java
similarity index 90%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java
index 5d89d10..b499d7f 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java
@@ -48,9 +48,9 @@ import java.util.concurrent.TimeUnit;
  * #getLeaderId(JobID)}. The future will only be completed with an exception 
in case the service
  * will be stopped.
  */
-public class JobLeaderIdService {
+public class DefaultJobLeaderIdService implements JobLeaderIdService {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(JobLeaderIdService.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultJobLeaderIdService.class);
 
     /** High availability services to use by this service. */
     private final HighAvailabilityServices highAvailabilityServices;
@@ -65,7 +65,7 @@ public class JobLeaderIdService {
     /** Actions to call when the job leader changes. */
     private JobLeaderIdActions jobLeaderIdActions;
 
-    public JobLeaderIdService(
+    public DefaultJobLeaderIdService(
             HighAvailabilityServices highAvailabilityServices,
             ScheduledExecutor scheduledExecutor,
             Time jobTimeout) {
@@ -79,12 +79,7 @@ public class JobLeaderIdService {
         jobLeaderIdActions = null;
     }
 
-    /**
-     * Start the service with the given job leader actions.
-     *
-     * @param initialJobLeaderIdActions to use for job leader id actions
-     * @throws Exception which is thrown when clearing up old state
-     */
+    @Override
     public void start(JobLeaderIdActions initialJobLeaderIdActions) throws 
Exception {
         if (isStarted()) {
             clear();
@@ -93,11 +88,7 @@ public class JobLeaderIdService {
         this.jobLeaderIdActions = 
Preconditions.checkNotNull(initialJobLeaderIdActions);
     }
 
-    /**
-     * Stop the service.
-     *
-     * @throws Exception which is thrown in case a retrieval service cannot be 
stopped properly
-     */
+    @Override
     public void stop() throws Exception {
         clear();
 
@@ -113,11 +104,7 @@ public class JobLeaderIdService {
         return jobLeaderIdActions != null;
     }
 
-    /**
-     * Stop and clear the currently registered job leader id listeners.
-     *
-     * @throws Exception which is thrown in case a retrieval service cannot be 
stopped properly
-     */
+    @Override
     public void clear() throws Exception {
         Exception exception = null;
 
@@ -133,19 +120,14 @@ public class JobLeaderIdService {
             ExceptionUtils.rethrowException(
                     exception,
                     "Could not properly stop the "
-                            + JobLeaderIdService.class.getSimpleName()
+                            + DefaultJobLeaderIdService.class.getSimpleName()
                             + '.');
         }
 
         jobLeaderIdListeners.clear();
     }
 
-    /**
-     * Add a job to be monitored to retrieve the job leader id.
-     *
-     * @param jobId identifying the job to monitor
-     * @throws Exception if the job could not be added to the service
-     */
+    @Override
     public void addJob(JobID jobId) throws Exception {
         Preconditions.checkNotNull(jobLeaderIdActions);
 
@@ -161,12 +143,7 @@ public class JobLeaderIdService {
         }
     }
 
-    /**
-     * Remove the given job from being monitored by the service.
-     *
-     * @param jobId identifying the job to remove from monitor
-     * @throws Exception if removing the job fails
-     */
+    @Override
     public void removeJob(JobID jobId) throws Exception {
         LOG.debug("Remove job {} from job leader id monitoring.", jobId);
 
@@ -177,16 +154,12 @@ public class JobLeaderIdService {
         }
     }
 
-    /**
-     * Check whether the given job is being monitored or not.
-     *
-     * @param jobId identifying the job
-     * @return True if the job is being monitored; otherwise false
-     */
+    @Override
     public boolean containsJob(JobID jobId) {
         return jobLeaderIdListeners.containsKey(jobId);
     }
 
+    @Override
     public CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws 
Exception {
         if (!jobLeaderIdListeners.containsKey(jobId)) {
             addJob(jobId);
@@ -197,6 +170,7 @@ public class JobLeaderIdService {
         return 
listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull);
     }
 
+    @Override
     public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
         JobLeaderIdListener jobLeaderIdListener = 
jobLeaderIdListeners.get(jobId);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
index e0418af..2a2a248 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 
 import java.util.UUID;
 
-/** Interface for actions called by the {@link JobLeaderIdService}. */
+/** Interface for actions called by the {@link DefaultJobLeaderIdService}. */
 public interface JobLeaderIdActions {
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
old mode 100755
new mode 100644
index 5d89d10..1c926ae
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,28 +19,10 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Service which retrieves for a registered job the current job leader id (the 
leader id of the job
@@ -48,36 +30,7 @@ import java.util.concurrent.TimeUnit;
  * #getLeaderId(JobID)}. The future will only be completed with an exception 
in case the service
  * will be stopped.
  */
-public class JobLeaderIdService {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(JobLeaderIdService.class);
-
-    /** High availability services to use by this service. */
-    private final HighAvailabilityServices highAvailabilityServices;
-
-    private final ScheduledExecutor scheduledExecutor;
-
-    private final Time jobTimeout;
-
-    /** Map of currently monitored jobs. */
-    private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners;
-
-    /** Actions to call when the job leader changes. */
-    private JobLeaderIdActions jobLeaderIdActions;
-
-    public JobLeaderIdService(
-            HighAvailabilityServices highAvailabilityServices,
-            ScheduledExecutor scheduledExecutor,
-            Time jobTimeout) {
-        this.highAvailabilityServices =
-                Preconditions.checkNotNull(highAvailabilityServices, 
"highAvailabilityServices");
-        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, 
"scheduledExecutor");
-        this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout");
-
-        jobLeaderIdListeners = new HashMap<>(4);
-
-        jobLeaderIdActions = null;
-    }
+public interface JobLeaderIdService {
 
     /**
      * Start the service with the given job leader actions.
@@ -85,60 +38,21 @@ public class JobLeaderIdService {
      * @param initialJobLeaderIdActions to use for job leader id actions
      * @throws Exception which is thrown when clearing up old state
      */
-    public void start(JobLeaderIdActions initialJobLeaderIdActions) throws 
Exception {
-        if (isStarted()) {
-            clear();
-        }
-
-        this.jobLeaderIdActions = 
Preconditions.checkNotNull(initialJobLeaderIdActions);
-    }
+    void start(JobLeaderIdActions initialJobLeaderIdActions) throws Exception;
 
     /**
      * Stop the service.
      *
      * @throws Exception which is thrown in case a retrieval service cannot be 
stopped properly
      */
-    public void stop() throws Exception {
-        clear();
-
-        this.jobLeaderIdActions = null;
-    }
-
-    /**
-     * Checks whether the service has been started.
-     *
-     * @return True if the service has been started; otherwise false
-     */
-    public boolean isStarted() {
-        return jobLeaderIdActions != null;
-    }
+    void stop() throws Exception;
 
     /**
      * Stop and clear the currently registered job leader id listeners.
      *
      * @throws Exception which is thrown in case a retrieval service cannot be 
stopped properly
      */
-    public void clear() throws Exception {
-        Exception exception = null;
-
-        for (JobLeaderIdListener listener : jobLeaderIdListeners.values()) {
-            try {
-                listener.stop();
-            } catch (Exception e) {
-                exception = ExceptionUtils.firstOrSuppressed(e, exception);
-            }
-        }
-
-        if (exception != null) {
-            ExceptionUtils.rethrowException(
-                    exception,
-                    "Could not properly stop the "
-                            + JobLeaderIdService.class.getSimpleName()
-                            + '.');
-        }
-
-        jobLeaderIdListeners.clear();
-    }
+    void clear() throws Exception;
 
     /**
      * Add a job to be monitored to retrieve the job leader id.
@@ -146,20 +60,7 @@ public class JobLeaderIdService {
      * @param jobId identifying the job to monitor
      * @throws Exception if the job could not be added to the service
      */
-    public void addJob(JobID jobId) throws Exception {
-        Preconditions.checkNotNull(jobLeaderIdActions);
-
-        LOG.debug("Add job {} to job leader id monitoring.", jobId);
-
-        if (!jobLeaderIdListeners.containsKey(jobId)) {
-            LeaderRetrievalService leaderRetrievalService =
-                    
highAvailabilityServices.getJobManagerLeaderRetriever(jobId);
-
-            JobLeaderIdListener jobIdListener =
-                    new JobLeaderIdListener(jobId, jobLeaderIdActions, 
leaderRetrievalService);
-            jobLeaderIdListeners.put(jobId, jobIdListener);
-        }
-    }
+    void addJob(JobID jobId) throws Exception;
 
     /**
      * Remove the given job from being monitored by the service.
@@ -167,15 +68,7 @@ public class JobLeaderIdService {
      * @param jobId identifying the job to remove from monitor
      * @throws Exception if removing the job fails
      */
-    public void removeJob(JobID jobId) throws Exception {
-        LOG.debug("Remove job {} from job leader id monitoring.", jobId);
-
-        JobLeaderIdListener listener = jobLeaderIdListeners.remove(jobId);
-
-        if (listener != null) {
-            listener.stop();
-        }
-    }
+    void removeJob(JobID jobId) throws Exception;
 
     /**
      * Check whether the given job is being monitored or not.
@@ -183,196 +76,23 @@ public class JobLeaderIdService {
      * @param jobId identifying the job
      * @return True if the job is being monitored; otherwise false
      */
-    public boolean containsJob(JobID jobId) {
-        return jobLeaderIdListeners.containsKey(jobId);
-    }
-
-    public CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws 
Exception {
-        if (!jobLeaderIdListeners.containsKey(jobId)) {
-            addJob(jobId);
-        }
-
-        JobLeaderIdListener listener = jobLeaderIdListeners.get(jobId);
-
-        return 
listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull);
-    }
-
-    public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
-        JobLeaderIdListener jobLeaderIdListener = 
jobLeaderIdListeners.get(jobId);
-
-        if (null != jobLeaderIdListener) {
-            return Objects.equals(timeoutId, 
jobLeaderIdListener.getTimeoutId());
-        } else {
-            return false;
-        }
-    }
-
-    // 
--------------------------------------------------------------------------------
-    // Static utility classes
-    // 
--------------------------------------------------------------------------------
+    boolean containsJob(JobID jobId);
 
     /**
-     * Listener which stores the current leader id and exposes them as a 
future value when
-     * requested. The returned future will always be completed properly except 
when stopping the
-     * listener.
+     * Get the leader's {@link JobMasterId} future for the given job.
+     *
+     * @param jobId jobId specifying for which job to retrieve the {@link 
JobMasterId}
+     * @return Future with the current leader's {@link JobMasterId}
+     * @throws Exception if retrieving the {@link JobMasterId} cannot be 
started
      */
-    private final class JobLeaderIdListener implements LeaderRetrievalListener 
{
-        private final Object timeoutLock = new Object();
-        private final JobID jobId;
-        private final JobLeaderIdActions listenerJobLeaderIdActions;
-        private final LeaderRetrievalService leaderRetrievalService;
-
-        private volatile CompletableFuture<UUID> leaderIdFuture;
-        private volatile boolean running = true;
-
-        /** Null if no timeout has been scheduled; otherwise non null. */
-        @Nullable private volatile ScheduledFuture<?> timeoutFuture;
-
-        /** Null if no timeout has been scheduled; otherwise non null. */
-        @Nullable private volatile UUID timeoutId;
-
-        private JobLeaderIdListener(
-                JobID jobId,
-                JobLeaderIdActions listenerJobLeaderIdActions,
-                LeaderRetrievalService leaderRetrievalService)
-                throws Exception {
-            this.jobId = Preconditions.checkNotNull(jobId);
-            this.listenerJobLeaderIdActions =
-                    Preconditions.checkNotNull(listenerJobLeaderIdActions);
-            this.leaderRetrievalService = 
Preconditions.checkNotNull(leaderRetrievalService);
-
-            leaderIdFuture = new CompletableFuture<>();
-
-            activateTimeout();
-
-            // start the leader service we're listening to
-            leaderRetrievalService.start(this);
-        }
-
-        public CompletableFuture<UUID> getLeaderIdFuture() {
-            return leaderIdFuture;
-        }
-
-        @Nullable
-        public UUID getTimeoutId() {
-            return timeoutId;
-        }
-
-        public void stop() throws Exception {
-            running = false;
-            leaderRetrievalService.stop();
-            cancelTimeout();
-            leaderIdFuture.completeExceptionally(
-                    new Exception("Job leader id service has been stopped."));
-        }
+    CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws Exception;
 
-        @Override
-        public void notifyLeaderAddress(
-                @Nullable String leaderAddress, @Nullable UUID 
leaderSessionId) {
-            if (running) {
-                UUID previousJobLeaderId = null;
-
-                if (leaderIdFuture.isDone()) {
-                    try {
-                        previousJobLeaderId = leaderIdFuture.getNow(null);
-                    } catch (CompletionException e) {
-                        // this should never happen since we complete this 
future always properly
-                        handleError(e);
-                    }
-
-                    if (leaderSessionId == null) {
-                        // there was a leader, but we no longer have one
-                        LOG.debug("Job {} no longer has a job leader.", jobId);
-                        leaderIdFuture = new CompletableFuture<>();
-                    } else {
-                        // there was an active leader, but we now have a new 
leader
-                        LOG.debug(
-                                "Job {} has a new job leader {}@{}.",
-                                jobId,
-                                leaderSessionId,
-                                leaderAddress);
-                        leaderIdFuture = 
CompletableFuture.completedFuture(leaderSessionId);
-                    }
-                } else {
-                    if (leaderSessionId != null) {
-                        // there was no active leader, but we now have a new 
leader
-                        LOG.debug(
-                                "Job {} has a new job leader {}@{}.",
-                                jobId,
-                                leaderSessionId,
-                                leaderAddress);
-                        leaderIdFuture.complete(leaderSessionId);
-                    }
-                }
-
-                if (previousJobLeaderId != null && 
!previousJobLeaderId.equals(leaderSessionId)) {
-                    // we had a previous job leader, so notify about his lost 
leadership
-                    listenerJobLeaderIdActions.jobLeaderLostLeadership(
-                            jobId, new JobMasterId(previousJobLeaderId));
-
-                    if (null == leaderSessionId) {
-                        // No current leader active ==> Set a timeout for the 
job
-                        activateTimeout();
-
-                        // check if we got stopped asynchronously
-                        if (!running) {
-                            cancelTimeout();
-                        }
-                    }
-                } else if (null != leaderSessionId) {
-                    // Cancel timeout because we've found an active leader for 
it
-                    cancelTimeout();
-                }
-            } else {
-                LOG.debug(
-                        "A leader id change {}@{} has been detected after the 
listener has been stopped.",
-                        leaderSessionId,
-                        leaderAddress);
-            }
-        }
-
-        @Override
-        public void handleError(Exception exception) {
-            if (running) {
-                listenerJobLeaderIdActions.handleError(exception);
-            } else {
-                LOG.debug(
-                        "An error occurred in the {} after the listener has 
been stopped.",
-                        JobLeaderIdListener.class.getSimpleName(),
-                        exception);
-            }
-        }
-
-        private void activateTimeout() {
-            synchronized (timeoutLock) {
-                cancelTimeout();
-
-                final UUID newTimeoutId = UUID.randomUUID();
-
-                timeoutId = newTimeoutId;
-                timeoutFuture =
-                        scheduledExecutor.schedule(
-                                new Runnable() {
-                                    @Override
-                                    public void run() {
-                                        
listenerJobLeaderIdActions.notifyJobTimeout(
-                                                jobId, newTimeoutId);
-                                    }
-                                },
-                                jobTimeout.toMilliseconds(),
-                                TimeUnit.MILLISECONDS);
-            }
-        }
-
-        private void cancelTimeout() {
-            synchronized (timeoutLock) {
-                if (timeoutFuture != null) {
-                    timeoutFuture.cancel(true);
-                }
-
-                timeoutFuture = null;
-                timeoutId = null;
-            }
-        }
-    }
+    /**
+     * Checks whether the given timeoutId for the given jobId is valid or not.
+     *
+     * @param jobId jobId identifying the job for which the timeout should be 
checked
+     * @param timeoutId timeoutId specifying the timeout which should be 
checked for its validity
+     * @return {@code true} if the timeout is valid; otherwise {@code false}
+     */
+    boolean isValidTimeout(JobID jobId, UUID timeoutId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
index 00129f2..1ce6d83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
@@ -70,7 +70,7 @@ public class ResourceManagerRuntimeServices {
                 createSlotManager(configuration, scheduledExecutor, 
slotManagerMetricGroup);
 
         final JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(
+                new DefaultJobLeaderIdService(
                         highAvailabilityServices, scheduledExecutor, 
configuration.getJobTimeout());
 
         return new ResourceManagerRuntimeServices(slotManager, 
jobLeaderIdService);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
similarity index 95%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
index 97f04da..7f3cd0a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
@@ -58,8 +58,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-/** Tests for the {@link JobLeaderIdService}. */
-public class JobLeaderIdServiceTest extends TestLogger {
+/** Tests for the {@link DefaultJobLeaderIdService}. */
+public class DefaultJobLeaderIdServiceTest extends TestLogger {
 
     /** Tests adding a job and finding out its leader id. */
     @Test(timeout = 10000)
@@ -79,7 +79,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
         JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class);
 
         JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
+                new DefaultJobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
 
         jobLeaderIdService.start(jobLeaderIdActions);
 
@@ -111,7 +111,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
         JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class);
 
         JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
+                new DefaultJobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
 
         jobLeaderIdService.start(jobLeaderIdActions);
 
@@ -152,7 +152,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
         JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class);
 
         JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
+                new DefaultJobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
 
         jobLeaderIdService.start(jobLeaderIdActions);
 
@@ -227,7 +227,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
                 .notifyJobTimeout(eq(jobId), any(UUID.class));
 
         JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
+                new DefaultJobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
 
         jobLeaderIdService.start(jobLeaderIdActions);
 
@@ -295,7 +295,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
         highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
 
         JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(
+                new DefaultJobLeaderIdService(
                         highAvailabilityServices,
                         new ManuallyTriggeredScheduledExecutor(),
                         Time.milliseconds(5000L));
@@ -332,8 +332,8 @@ public class JobLeaderIdServiceTest extends TestLogger {
         ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
         Time timeout = Time.milliseconds(5000L);
         JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class);
-        JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
+        DefaultJobLeaderIdService jobLeaderIdService =
+                new DefaultJobLeaderIdService(highAvailabilityServices, 
scheduledExecutor, timeout);
 
         assertFalse(jobLeaderIdService.isStarted());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 1639feb..1c9eb17 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -135,7 +135,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
         HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 
1000L);
 
         JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(
+                new DefaultJobLeaderIdService(
                         haServices, rpcService.getScheduledExecutor(), 
Time.minutes(5L));
 
         final SlotManager slotManager =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index f950a60..3853b1c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -232,7 +232,7 @@ public class ResourceManagerPartitionLifecycleTest extends 
TestLogger {
                         
.setScheduledExecutor(rpcService.getScheduledExecutor())
                         .build();
         final JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(
+                new DefaultJobLeaderIdService(
                         highAvailabilityServices,
                         rpcService.getScheduledExecutor(),
                         TestingUtils.infiniteTime());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 9720395..9223bbe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -161,7 +161,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                         .build();
 
         JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(
+                new DefaultJobLeaderIdService(
                         highAvailabilityServices,
                         rpcService.getScheduledExecutor(),
                         Time.minutes(5L));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 9cfb526..b6971ef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -322,7 +322,7 @@ public class ResourceManagerTest extends TestLogger {
         rpcService.registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
 
         final JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(
+                new DefaultJobLeaderIdService(
                         highAvailabilityServices,
                         rpcService.getScheduledExecutor(),
                         TestingUtils.infiniteTime());
@@ -372,7 +372,7 @@ public class ResourceManagerTest extends TestLogger {
     private TestingResourceManager createAndStartResourceManager(
             HeartbeatServices heartbeatServices) throws Exception {
         final JobLeaderIdService jobLeaderIdService =
-                new JobLeaderIdService(
+                new DefaultJobLeaderIdService(
                         highAvailabilityServices,
                         rpcService.getScheduledExecutor(),
                         TestingUtils.infiniteTime());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
index b72217c..fd8920b 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.DefaultJobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
@@ -70,7 +71,7 @@ public class MockResourceManagerRuntimeServices {
         
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
         heartbeatServices = new TestingHeartbeatServices();
         jobLeaderIdService =
-                new JobLeaderIdService(
+                new DefaultJobLeaderIdService(
                         highAvailabilityServices,
                         rpcService.getScheduledExecutor(),
                         Time.minutes(5L));

Reply via email to