zentol commented on code in PR #22661:
URL: https://github.com/apache/flink/pull/22661#discussion_r1245083242
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
Review Comment:
> The grantLeadership is more like a convenience method.
What is convenient about it though?
We have to supply the implementation and call the method ourselves; so what
are we really getting?
The only benefit I see is getting access to the lock, but I wonder if that
should really be exposed?
> We could clean this up as part of
[FLINK-32381](https://issues.apache.org/jira/browse/FLINK-32381).
How is this related to the error handling?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]