XComp commented on code in PR #22661:
URL: https://github.com/apache/flink/pull/22661#discussion_r1242100774
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
Review Comment:
The `grantLeadership` is more like a convenience method. Its concept was
copied over from `TestingLeaderElectionDriver` (3f2e4619). But as mentioned in
a [previous
comment](https://github.com/apache/flink/pull/22661#discussion_r1242097169): We
could clean this up as part of FLINK-32381.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]