This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new b61e01a  [FLINK-21497][coordination] Only complete leader future with 
valid leader
b61e01a is described below

commit b61e01ac1644151ba02a4833181adb1383b1a9da
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Feb 25 14:27:46 2021 +0100

    [FLINK-21497][coordination] Only complete leader future with valid leader
---
 .../leaderretrieval/LeaderRetrievalListener.java   |  3 ++
 .../resourcemanager/JobLeaderIdService.java        | 29 +++++++++--
 .../resourcemanager/JobLeaderIdServiceTest.java    | 56 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
index d485241..da89ecc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
@@ -31,6 +31,9 @@ public interface LeaderRetrievalListener {
     /**
      * This method is called by the {@link LeaderRetrievalService} when a new 
leader is elected.
      *
+     * <p>If both arguments are null then it signals that leadership was 
revoked without a new
+     * leader having been elected.
+     *
      * @param leaderAddress The address of the new leader
      * @param leaderSessionID The new leader session ID
      */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 25a9889..8dec9d3 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -267,10 +267,9 @@ public class JobLeaderIdService {
         }
 
         @Override
-        public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionId) {
+        public void notifyLeaderAddress(
+                @Nullable String leaderAddress, @Nullable UUID 
leaderSessionId) {
             if (running) {
-                LOG.debug("Found a new job leader {}@{}.", leaderSessionId, 
leaderAddress);
-
                 UUID previousJobLeaderId = null;
 
                 if (leaderIdFuture.isDone()) {
@@ -281,9 +280,29 @@ public class JobLeaderIdService {
                         handleError(e);
                     }
 
-                    leaderIdFuture = 
CompletableFuture.completedFuture(leaderSessionId);
+                    if (leaderSessionId == null) {
+                        // there was a leader, but we no longer have one
+                        LOG.debug("Job {} no longer has a job leader.", jobId);
+                        leaderIdFuture = new CompletableFuture<>();
+                    } else {
+                        // there was an active leader, but we now have a new 
leader
+                        LOG.debug(
+                                "Job {} has a new job leader {}@{}.",
+                                jobId,
+                                leaderSessionId,
+                                leaderAddress);
+                        leaderIdFuture = 
CompletableFuture.completedFuture(leaderSessionId);
+                    }
                 } else {
-                    leaderIdFuture.complete(leaderSessionId);
+                    if (leaderSessionId != null) {
+                        // there was no active leader, but we now have a new 
leader
+                        LOG.debug(
+                                "Job {} has a new job leader {}@{}.",
+                                jobId,
+                                leaderSessionId,
+                                leaderAddress);
+                        leaderIdFuture.complete(leaderSessionId);
+                    }
                 }
 
                 if (previousJobLeaderId != null && 
!previousJobLeaderId.equals(leaderSessionId)) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index f06d7a4..02032bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
@@ -41,9 +42,11 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -274,4 +277,57 @@ public class JobLeaderIdServiceTest extends TestLogger {
         // the new timeout should be valid
         assertTrue(jobLeaderIdService.isValidTimeout(jobId, 
lastTimeoutId.get()));
     }
+
+    /**
+     * Tests that the leaderId future is only completed once the service is 
notified about an actual
+     * leader being elected. Specifically, it tests that the future is not 
completed if the
+     * leadership was revoked without a new leader having been elected.
+     */
+    @Test(timeout = 10000)
+    public void testLeaderFutureWaitsForValidLeader() throws Exception {
+        final JobID jobId = new JobID();
+        TestingHighAvailabilityServices highAvailabilityServices =
+                new TestingHighAvailabilityServices();
+        SettableLeaderRetrievalService leaderRetrievalService =
+                new SettableLeaderRetrievalService(null, null);
+
+        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
+
+        JobLeaderIdService jobLeaderIdService =
+                new JobLeaderIdService(
+                        highAvailabilityServices,
+                        new ManuallyTriggeredScheduledExecutor(),
+                        Time.milliseconds(5000L));
+
+        jobLeaderIdService.start(new NoOpJobLeaderIdActions());
+
+        jobLeaderIdService.addJob(jobId);
+
+        // elect some leader
+        leaderRetrievalService.notifyListener("foo", UUID.randomUUID());
+
+        // notify about leadership loss
+        leaderRetrievalService.notifyListener(null, null);
+
+        final CompletableFuture<JobMasterId> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
+        // there is currently no leader, so this should not be completed
+        assertThat(leaderIdFuture.isDone(), is(false));
+
+        // elect a new leader
+        final UUID newLeaderId = UUID.randomUUID();
+        leaderRetrievalService.notifyListener("foo", newLeaderId);
+        assertThat(leaderIdFuture.get(), 
is(JobMasterId.fromUuidOrNull(newLeaderId)));
+    }
+
+    private static class NoOpJobLeaderIdActions implements JobLeaderIdActions {
+
+        @Override
+        public void jobLeaderLostLeadership(JobID jobId, JobMasterId 
oldJobMasterId) {}
+
+        @Override
+        public void notifyJobTimeout(JobID jobId, UUID timeoutId) {}
+
+        @Override
+        public void handleError(Throwable error) {}
+    }
 }

Reply via email to