XComp commented on code in PR #22661:
URL: https://github.com/apache/flink/pull/22661#discussion_r1245352257


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
 
-import javax.annotation.Nullable;
-
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
- * {@link LeaderElectionDriver} implementation which provides some convenience 
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually 
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of 
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
  */
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements 
MultipleComponentLeaderElectionDriver {
 
-    private final Object lock = new Object();
+    private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+    private final TriConsumer<ReentrantLock, String, LeaderInformation>
+            publishLeaderInformationConsumer;
+    private final BiConsumer<ReentrantLock, String> 
deleteLeaderInformationConsumer;
 
-    private final AtomicBoolean isLeader = new AtomicBoolean(false);
-    private final LeaderElectionEventHandler leaderElectionEventHandler;
-    private final FatalErrorHandler fatalErrorHandler;
+    private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
 
-    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable;
-    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable;
+    private final TriConsumer<ReentrantLock, Optional<Listener>, UUID> 
grantConsumer;
+    private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
 
-    private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+    private final ReentrantLock lock = new ReentrantLock();
 
-    // Leader information on external storage
-    private LeaderInformation leaderInformation = LeaderInformation.empty();
+    private Optional<Listener> listener = Optional.empty();
+    private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
 
-    private TestingLeaderElectionDriver(
-            LeaderElectionEventHandler leaderElectionEventHandler,
-            FatalErrorHandler fatalErrorHandler,
-            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable,
-            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable,
-            Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
-        this.leaderElectionEventHandler = leaderElectionEventHandler;
-        this.fatalErrorHandler = fatalErrorHandler;
-        this.closeRunnable = closeRunnable;
-        this.beforeLockCloseRunnable = beforeLockCloseRunnable;
-        this.beforeGrantRunnable = beforeGrantRunnable;
+    public TestingLeaderElectionDriver(
+            Function<ReentrantLock, Boolean> hasLeadershipFunction,
+            TriConsumer<ReentrantLock, String, LeaderInformation> 
publishLeaderInformationConsumer,
+            BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+            ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+            TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+            BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+        this.hasLeadershipFunction = hasLeadershipFunction;
+        this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
+        this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+        this.closeConsumer = closeConsumer;
+        this.grantConsumer = grantConsumer;
+        this.revokeConsumer = revokeConsumer;
     }
 
-    @Override
-    public void writeLeaderInformation(LeaderInformation leaderInformation) {
-        this.leaderInformation = leaderInformation;
+    private void initialize(Listener listener, FatalErrorHandler 
fatalErrorHandler) {
+        Preconditions.checkState(
+                !this.listener.isPresent(), "The driver should be only 
initialized once.");
+
+        this.listener = Optional.of(listener);
+        this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+    }
+
+    public void grantLeadership() {

Review Comment:
   Convenience in a way that I wanted to mimick the feature set of the deleted 
`TestingLeaderElectionDriver` to reduce the changes in tests that used this 
class but would be refactored to use the new `TestingLeaderElectionDriver` 
implementation.
   
   Exposing the lock was necessary for 
`DefaultLeaderElectionService#testCloseGrantDeadlock` where we wanted to check 
that we don't end up in a deadlock. Previously, this was made available through 
the old TestingLeaderElectionDriver implementation and a custom consumer that 
was called before entering the lock (see 
[TestingLeaderElectionDriver:46](https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java#L46).
 I felt like this solution wasn't ideal either and that's why I tried to expose 
the lock in a more generic way.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to