This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new b61e01a [FLINK-21497][coordination] Only complete leader future with
valid leader
b61e01a is described below
commit b61e01ac1644151ba02a4833181adb1383b1a9da
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Feb 25 14:27:46 2021 +0100
[FLINK-21497][coordination] Only complete leader future with valid leader
---
.../leaderretrieval/LeaderRetrievalListener.java | 3 ++
.../resourcemanager/JobLeaderIdService.java | 29 +++++++++--
.../resourcemanager/JobLeaderIdServiceTest.java | 56 ++++++++++++++++++++++
3 files changed, 83 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
index d485241..da89ecc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
@@ -31,6 +31,9 @@ public interface LeaderRetrievalListener {
/**
* This method is called by the {@link LeaderRetrievalService} when a new
leader is elected.
*
+ * <p>If both arguments are null then it signals that leadership was
revoked without a new
+ * leader having been elected.
+ *
* @param leaderAddress The address of the new leader
* @param leaderSessionID The new leader session ID
*/
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 25a9889..8dec9d3 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -267,10 +267,9 @@ public class JobLeaderIdService {
}
@Override
- public void notifyLeaderAddress(String leaderAddress, UUID
leaderSessionId) {
+ public void notifyLeaderAddress(
+ @Nullable String leaderAddress, @Nullable UUID
leaderSessionId) {
if (running) {
- LOG.debug("Found a new job leader {}@{}.", leaderSessionId,
leaderAddress);
-
UUID previousJobLeaderId = null;
if (leaderIdFuture.isDone()) {
@@ -281,9 +280,29 @@ public class JobLeaderIdService {
handleError(e);
}
- leaderIdFuture =
CompletableFuture.completedFuture(leaderSessionId);
+ if (leaderSessionId == null) {
+ // there was a leader, but we no longer have one
+ LOG.debug("Job {} no longer has a job leader.", jobId);
+ leaderIdFuture = new CompletableFuture<>();
+ } else {
+ // there was an active leader, but we now have a new
leader
+ LOG.debug(
+ "Job {} has a new job leader {}@{}.",
+ jobId,
+ leaderSessionId,
+ leaderAddress);
+ leaderIdFuture =
CompletableFuture.completedFuture(leaderSessionId);
+ }
} else {
- leaderIdFuture.complete(leaderSessionId);
+ if (leaderSessionId != null) {
+ // there was no active leader, but we now have a new
leader
+ LOG.debug(
+ "Job {} has a new job leader {}@{}.",
+ jobId,
+ leaderSessionId,
+ leaderAddress);
+ leaderIdFuture.complete(leaderSessionId);
+ }
}
if (previousJobLeaderId != null &&
!previousJobLeaderId.equals(leaderSessionId)) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index f06d7a4..02032bb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
@@ -41,9 +42,11 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -274,4 +277,57 @@ public class JobLeaderIdServiceTest extends TestLogger {
// the new timeout should be valid
assertTrue(jobLeaderIdService.isValidTimeout(jobId,
lastTimeoutId.get()));
}
+
+ /**
+ * Tests that the leaderId future is only completed once the service is
notified about an actual
+ * leader being elected. Specifically, it tests that the future is not
completed if the
+ * leadership was revoked without a new leader having been elected.
+ */
+ @Test(timeout = 10000)
+ public void testLeaderFutureWaitsForValidLeader() throws Exception {
+ final JobID jobId = new JobID();
+ TestingHighAvailabilityServices highAvailabilityServices =
+ new TestingHighAvailabilityServices();
+ SettableLeaderRetrievalService leaderRetrievalService =
+ new SettableLeaderRetrievalService(null, null);
+
+ highAvailabilityServices.setJobMasterLeaderRetriever(jobId,
leaderRetrievalService);
+
+ JobLeaderIdService jobLeaderIdService =
+ new JobLeaderIdService(
+ highAvailabilityServices,
+ new ManuallyTriggeredScheduledExecutor(),
+ Time.milliseconds(5000L));
+
+ jobLeaderIdService.start(new NoOpJobLeaderIdActions());
+
+ jobLeaderIdService.addJob(jobId);
+
+ // elect some leader
+ leaderRetrievalService.notifyListener("foo", UUID.randomUUID());
+
+ // notify about leadership loss
+ leaderRetrievalService.notifyListener(null, null);
+
+ final CompletableFuture<JobMasterId> leaderIdFuture =
jobLeaderIdService.getLeaderId(jobId);
+ // there is currently no leader, so this should not be completed
+ assertThat(leaderIdFuture.isDone(), is(false));
+
+ // elect a new leader
+ final UUID newLeaderId = UUID.randomUUID();
+ leaderRetrievalService.notifyListener("foo", newLeaderId);
+ assertThat(leaderIdFuture.get(),
is(JobMasterId.fromUuidOrNull(newLeaderId)));
+ }
+
+ private static class NoOpJobLeaderIdActions implements JobLeaderIdActions {
+
+ @Override
+ public void jobLeaderLostLeadership(JobID jobId, JobMasterId
oldJobMasterId) {}
+
+ @Override
+ public void notifyJobTimeout(JobID jobId, UUID timeoutId) {}
+
+ @Override
+ public void handleError(Throwable error) {}
+ }
}