XComp commented on code in PR #22661:
URL: https://github.com/apache/flink/pull/22661#discussion_r1242219813
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
+ grantLeadership(UUID.randomUUID());
+ }
+
+ public void grantLeadership(UUID leaderSessionID) {
+ grantConsumer.accept(lock, listener, leaderSessionID);
+ }
+
+ public void revokeLeadership() {
+ revokeConsumer.accept(lock, listener);
+ }
+
+ public void triggerErrorHandling(Throwable throwable) {
+ runInLockIfPresent(fatalErrorHandler, f -> f.onFatalError(throwable));
+ }
+
+ public void triggerLeaderInformationChangeEvent(
+ String contenderID, LeaderInformation leaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyLeaderInformationChange(contenderID, leaderInformation));
+ }
+
+ public void triggerAllLeaderInformationChangeEvent(
+ LeaderInformationRegister newLeaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyAllKnownLeaderInformation(newLeaderInformation));
+ }
+
+ private <T> void runInLockIfPresent(Optional<T> optional, Consumer<T>
callback) {
+ try {
+ lock.lock();
+ optional.ifPresent(callback);
+ } finally {
+ lock.unlock();
+ }
}
Review Comment:
fair point, the listener itself would be initialized in the constructor of
the driver and therefore isn't subject to concurrenct access. I moved the lock
block. :+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]