zentol commented on code in PR #22661:
URL: https://github.com/apache/flink/pull/22661#discussion_r1245083242


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
 
-import javax.annotation.Nullable;
-
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
- * {@link LeaderElectionDriver} implementation which provides some convenience 
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually 
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of 
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
  */
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements 
MultipleComponentLeaderElectionDriver {
 
-    private final Object lock = new Object();
+    private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+    private final TriConsumer<ReentrantLock, String, LeaderInformation>
+            publishLeaderInformationConsumer;
+    private final BiConsumer<ReentrantLock, String> 
deleteLeaderInformationConsumer;
 
-    private final AtomicBoolean isLeader = new AtomicBoolean(false);
-    private final LeaderElectionEventHandler leaderElectionEventHandler;
-    private final FatalErrorHandler fatalErrorHandler;
+    private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
 
-    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable;
-    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable;
+    private final TriConsumer<ReentrantLock, Optional<Listener>, UUID> 
grantConsumer;
+    private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
 
-    private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+    private final ReentrantLock lock = new ReentrantLock();
 
-    // Leader information on external storage
-    private LeaderInformation leaderInformation = LeaderInformation.empty();
+    private Optional<Listener> listener = Optional.empty();
+    private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
 
-    private TestingLeaderElectionDriver(
-            LeaderElectionEventHandler leaderElectionEventHandler,
-            FatalErrorHandler fatalErrorHandler,
-            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable,
-            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable,
-            Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
-        this.leaderElectionEventHandler = leaderElectionEventHandler;
-        this.fatalErrorHandler = fatalErrorHandler;
-        this.closeRunnable = closeRunnable;
-        this.beforeLockCloseRunnable = beforeLockCloseRunnable;
-        this.beforeGrantRunnable = beforeGrantRunnable;
+    public TestingLeaderElectionDriver(
+            Function<ReentrantLock, Boolean> hasLeadershipFunction,
+            TriConsumer<ReentrantLock, String, LeaderInformation> 
publishLeaderInformationConsumer,
+            BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+            ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+            TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+            BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+        this.hasLeadershipFunction = hasLeadershipFunction;
+        this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
+        this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+        this.closeConsumer = closeConsumer;
+        this.grantConsumer = grantConsumer;
+        this.revokeConsumer = revokeConsumer;
     }
 
-    @Override
-    public void writeLeaderInformation(LeaderInformation leaderInformation) {
-        this.leaderInformation = leaderInformation;
+    private void initialize(Listener listener, FatalErrorHandler 
fatalErrorHandler) {
+        Preconditions.checkState(
+                !this.listener.isPresent(), "The driver should be only 
initialized once.");
+
+        this.listener = Optional.of(listener);
+        this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+    }
+
+    public void grantLeadership() {

Review Comment:
   > The grantLeadership is more like a convenience method.
   
   What is convenient about it though?
   We have to supply the implementation and call the method ourselves; so what 
are we really getting?
   The only benefit I see is getting access to the lock, but I wonder if that 
should really be exposed?
   
   > We could clean this up as part of 
[FLINK-32381](https://issues.apache.org/jira/browse/FLINK-32381).
   
   How is this related to the error handling?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to