zentol commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1168724716


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> 
leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   Why is it not a problem that onGrantLeadership is no longer called under the 
lock? Could this now result in concurrent onGrant/onRevoke calls being made?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new 
CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new 
TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, 
yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be 
removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new 
TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * This issue can happen when the shutdown of the contender takes too long 
and the leadership is
+     * re-acquired in the meantime (see FLINK-29234).

Review Comment:
   I dont understand how this comment relates to the test.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different 
{@link
  * LeaderElectionDriver}, we could perform a leader election for the 
contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link 
LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, 
AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link 
LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link 
#start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link 
LeaderElectionDriver} iff the leadership

Review Comment:
   ```suggestion
        * Saves the session ID which was issued by the {@link 
LeaderElectionDriver} if the leadership
   ```
   Not looking forward to more PRs trying to "fix a type".



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new 
CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new 
TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, 
yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be 
removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new 
TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * This issue can happen when the shutdown of the contender takes too long 
and the leadership is
+     * re-acquired in the meantime (see FLINK-29234).
+     */
+    @Test
+    void testStopWhileHavingLeadership() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();

Review Comment:
   Why isnt this just part of the constructor?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderContender.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/**
+ * {@code TestingLeaderContender} is a more generic implementation in 
comparison to {@link
+ * TestingContender}.
+ */
+public class TestingLeaderContender implements LeaderContender {
+
+    private final Consumer<UUID> grantLeadershipConsumer;
+    private final Runnable revokeLeadershipRunnable;
+    private final Consumer<Exception> handleErrorConsumer;
+
+    public TestingLeaderContender(
+            Consumer<UUID> grantLeadershipConsumer,
+            Runnable revokeLeadershipRunnable,
+            Consumer<Exception> handleErrorConsumer) {
+        this.grantLeadershipConsumer = grantLeadershipConsumer;
+        this.revokeLeadershipRunnable = revokeLeadershipRunnable;
+        this.handleErrorConsumer = handleErrorConsumer;
+    }
+
+    @Override
+    public void grantLeadership(UUID leaderSessionID) {
+        grantLeadershipConsumer.accept(leaderSessionID);
+    }
+
+    @Override
+    public void revokeLeadership() {
+        revokeLeadershipRunnable.run();
+    }
+
+    @Override
+    public void handleError(Exception exception) {
+        handleErrorConsumer.accept(exception);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** {@code Builder} for {@code TestingLeaderContender}. */
+    public static class Builder {

Review Comment:
   add private constructor to enforce single instantiation path



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new 
CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new 
TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, 
yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be 
removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {

Review Comment:
   Is the description in FLINK-31814 based on your WIP branch? Asking since 
there is neither a `DefaultLeaderElection.close()` or 
`DefaultLeaderElectionService.close` in the current master...



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -77,22 +100,48 @@ public 
DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDr
         this.leaderElectionDriver = null;
 
         this.confirmedLeaderInformation = LeaderInformation.empty();
+    }
 
-        this.running = false;
+    /**
+     * Starts the leader election process. This method has to be called before 
registering a {@link
+     * LeaderContender}.
+     */
+    public void startLeaderElectionBackend() throws Exception {
+        synchronized (lock) {
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "No LeaderContender should have been registered, yet.");
+
+            leaderElectionDriver =
+                    leaderElectionDriverFactory.createLeaderElectionDriver(
+                            this, new LeaderElectionFatalErrorHandler());
+
+            LOG.info("Instantiating DefaultLeaderElectionService with {}.", 
leaderElectionDriver);
+        }
     }
 
     @Override
     public final void start(LeaderContender contender) throws Exception {
         checkNotNull(contender, "Contender must not be null.");
-        Preconditions.checkState(leaderContender == null, "Contender was 
already set.");
 
         synchronized (lock) {
-            running = true;
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "Only one LeaderContender is allowed to be registered to 
this service.");
+            Preconditions.checkState(
+                    leaderElectionDriver != null,
+                    "The DefaultLeaderElectionService should have established 
a connection to the backend before it's started.");
+
             leaderContender = contender;
-            leaderElectionDriver =
-                    leaderElectionDriverFactory.createLeaderElectionDriver(
-                            this, new LeaderElectionFatalErrorHandler());
-            LOG.info("Starting DefaultLeaderElectionService with {}.", 
leaderElectionDriver);
+
+            LOG.info(
+                    "LeaderContender {} has been registered for {}.",
+                    contender.getDescription(),
+                    leaderElectionDriver);
+
+            if (hasLeadership()) {
+                notifyLeaderContenderOfLeadership();

Review Comment:
   Wondering if this should really happen in `start()` :thinking: That means 
it's a different thread than usually, right? Usually it'd be some HA backend 
thread, but now it's the main thread?
   
   Why isnt this calling `onGrantLeadership`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different 
{@link
  * LeaderElectionDriver}, we could perform a leader election for the 
contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link 
LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, 
AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link 
LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link 
#start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link 
LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code 
null} indicates that
+     * this service isn't the leader right now (i.e. {@code
+     * leaderElectionDriver.hasLeadership(UUID)} would return {@code false} 
for any session ID.
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} 
after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the 
connection to the
+     * LeaderElection backend isn't established, yet. See {@link 
#startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver 
should have been
+     * established before registering a {@link LeaderContender} and stopped 
after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
+    private volatile LeaderElectionDriver leaderElectionDriver;

Review Comment:
   why is this one now volatile when everything else isn't?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, 
String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even 
though the "
-                                    + "corresponding JobManager was not 
elected as the leader.",
+                                    + "corresponding service was not elected 
as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID 
!= null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender 
was removed, returning false.");
+                    return false;
+                }
             } else {
-                LOG.debug("hasLeadership is called after the service is 
stopped, returning false.");
+                LOG.debug("hasLeadership is called after the service is 
closed, returning false.");
                 return false;
             }
         }
     }
 
-    /**
-     * Returns the current leader session ID or null, if the contender is not 
the leader.
-     *
-     * @return The last leader session ID or null, if the contender is not the 
leader
-     */
+    /** Returns the current leader session ID or {@code null}, if the session 
wasn't confirmed. */
     @VisibleForTesting
     @Nullable
     public UUID getLeaderSessionID() {
-        return confirmedLeaderInformation.getLeaderSessionID();
-    }
-
-    @GuardedBy("lock")
-    private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
-        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, 
leaderAddress);
-        
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+        synchronized (lock) {
+            return confirmedLeaderInformation.getLeaderSessionID();
+        }
     }
 
     @Override
     public void onGrantLeadership(UUID newLeaderSessionId) {
+        Preconditions.checkNotNull(newLeaderSessionId);
+
         synchronized (lock) {
-            if (running) {
-                issuedLeaderSessionID = newLeaderSessionId;
-                confirmedLeaderInformation = LeaderInformation.empty();
+            Preconditions.checkState(
+                    issuedLeaderSessionID == null,
+                    "The leadership should have been granted while not having 
the leadership acquired.");
 
-                LOG.debug(
-                        "Grant leadership to contender {} with session ID {}.",
-                        leaderContender.getDescription(),
-                        issuedLeaderSessionID);
+            issuedLeaderSessionID = newLeaderSessionId;
 
-                leaderContender.grantLeadership(issuedLeaderSessionID);
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadership();
             } else {
                 LOG.debug(
-                        "Ignoring the grant leadership notification since the 
{} has already been closed.",
+                        "The grant leadership notification is not forwarded 
because the DefaultLeaderElectionService ({}) has no contender registered.",
                         leaderElectionDriver);
             }
         }
     }
 
+    @GuardedBy("lock")
+    private void notifyLeaderContenderOfLeadership() {
+        Preconditions.checkState(
+                confirmedLeaderInformation.isEmpty(),
+                "The leadership should have been granted while not having the 
leadership acquired.");
+
+        LOG.debug(
+                "Granting leadership to contender {} with session ID {}.",
+                leaderContender.getDescription(),
+                issuedLeaderSessionID);
+
+        leaderContender.grantLeadership(issuedLeaderSessionID);
+    }
+
     @Override
     public void onRevokeLeadership() {
         synchronized (lock) {
-            if (running) {
-                handleLeadershipLoss();
+            // TODO: FLINK-31814 covers adding this Precondition
+            // Preconditions.checkState(issuedLeaderSessionID != null,"The 
leadership should have
+            // been revoked while having the leadership acquired.");
+
+            final UUID previousSessionID = issuedLeaderSessionID;
+            issuedLeaderSessionID = null;
+
+            if (leaderContender != null) {

Review Comment:
   Is this branching also related to FLINK-31814?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -772,7 +773,11 @@ void 
testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
             closeAsyncCalledTrigger.await();
 
             final CheckedThread grantLeadershipThread =
-                    createCheckedThread(currentLeaderDriver::isLeader);
+                    createCheckedThread(
+                            () -> {
+                                currentLeaderDriver.notLeader();
+                                currentLeaderDriver.isLeader();

Review Comment:
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to