XComp commented on code in PR #22661:
URL: https://github.com/apache/flink/pull/22661#discussion_r1242097169
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
+ grantLeadership(UUID.randomUUID());
+ }
+
+ public void grantLeadership(UUID leaderSessionID) {
+ grantConsumer.accept(lock, listener, leaderSessionID);
+ }
+
+ public void revokeLeadership() {
+ revokeConsumer.accept(lock, listener);
+ }
+
+ public void triggerErrorHandling(Throwable throwable) {
+ runInLockIfPresent(fatalErrorHandler, f -> f.onFatalError(throwable));
+ }
+
+ public void triggerLeaderInformationChangeEvent(
+ String contenderID, LeaderInformation leaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyLeaderInformationChange(contenderID, leaderInformation));
+ }
+
+ public void triggerAllLeaderInformationChangeEvent(
+ LeaderInformationRegister newLeaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyAllKnownLeaderInformation(newLeaderInformation));
+ }
+
+ private <T> void runInLockIfPresent(Optional<T> optional, Consumer<T>
callback) {
+ try {
+ lock.lock();
+ optional.ifPresent(callback);
+ } finally {
+ lock.unlock();
+ }
}
@Override
public boolean hasLeadership() {
- return isLeader.get();
+ return hasLeadershipFunction.apply(lock);
}
@Override
- public void close() throws Exception {
- beforeLockCloseRunnable.accept(leaderElectionEventHandler);
- synchronized (lock) {
- closeRunnable.accept(leaderElectionEventHandler);
- }
+ public void publishLeaderInformation(String contenderID, LeaderInformation
leaderInformation) {
+ publishLeaderInformationConsumer.accept(lock, contenderID,
leaderInformation);
}
- public LeaderInformation getLeaderInformation() {
- return leaderInformation;
+ @Override
+ public void deleteLeaderInformation(String contenderID) {
+ deleteLeaderInformationConsumer.accept(lock, contenderID);
}
- public void isLeader(UUID newSessionID) {
- synchronized (lock) {
- isLeader.set(true);
- beforeGrantRunnable.accept(leaderElectionEventHandler);
- leaderElectionEventHandler.onGrantLeadership(newSessionID);
- }
+ @Override
+ public void close() throws Exception {
+ closeConsumer.accept(lock);
}
- public void isLeader() {
- isLeader(UUID.randomUUID());
+ public static Builder newNoOpBuilder() {
+ return new Builder();
}
- public void notLeader() {
- synchronized (lock) {
- isLeader.set(false);
- leaderElectionEventHandler.onRevokeLeadership();
- }
+ public static Builder newBuilder() {
+ return newBuilder(new AtomicBoolean(), new AtomicReference<>(), new
AtomicBoolean());
}
- public void leaderInformationChanged(LeaderInformation newLeader) {
- leaderInformation = newLeader;
- leaderElectionEventHandler.onLeaderInformationChange(newLeader);
+ /**
+ * Returns a {@code Builder} that comes with a basic default
implementation of the {@link
+ * MultipleComponentLeaderElectionDriver} contract using the passed
parameters for information
+ * storage.
+ *
+ * @param hasLeadership saves the current leadership state of the instance
that is created from
+ * the {@code Builder}.
+ * @param storedLeaderInformation saves the leader information that would
be otherwise stored in
+ * some external storage.
+ * @param isClosed saves the running state of the driver.
+ */
+ public static Builder newBuilder(
+ AtomicBoolean hasLeadership,
+ AtomicReference<LeaderInformationRegister> storedLeaderInformation,
+ AtomicBoolean isClosed) {
+ Preconditions.checkState(
+ !hasLeadership.get(), "Initial state check for hasLeadership
failed.");
+ Preconditions.checkState(
+ storedLeaderInformation.get() == null
+ || !storedLeaderInformation
+ .get()
+ .getRegisteredContenderIDs()
+ .iterator()
+ .hasNext(),
+ "Initial state check for storedLeaderInformation failed.");
+ Preconditions.checkState(!isClosed.get(), "Initial state check for
isClosed failed.");
+ return newNoOpBuilder()
+ .setHasLeadershipFunction(
+ lock -> {
+ try {
+ lock.lock();
+ return hasLeadership.get();
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setPublishLeaderInformationConsumer(
+ (lock, contenderID, leaderInformation) -> {
+ try {
+ lock.lock();
+ if (hasLeadership.get()) {
+ storedLeaderInformation.getAndUpdate(
+ oldData -> {
+ if (oldData != null) {
+ if
(leaderInformation.isEmpty()) {
+ return
LeaderInformationRegister.clear(
+ oldData,
contenderID);
+ } else {
+ return
LeaderInformationRegister.merge(
+ oldData,
+ contenderID,
+
leaderInformation);
+ }
+ } else if
(!leaderInformation.isEmpty()) {
+ return
LeaderInformationRegister.of(
+ contenderID,
leaderInformation);
+ }
+
+ return new
LeaderInformationRegister();
+ });
+ }
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setDeleteLeaderInformationConsumer(
+ (lock, contenderID) -> {
+ try {
+ lock.lock();
+ if (hasLeadership.get()) {
+ storedLeaderInformation.getAndUpdate(
+ oldData ->
+ oldData != null
+ ?
LeaderInformationRegister.clear(
+ oldData,
contenderID)
+ : new
LeaderInformationRegister());
+ }
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setGrantConsumer(
+ (lock, listener, leaderSessionID) -> {
+ try {
+ lock.lock();
+ Preconditions.checkState(
+ !hasLeadership.get(),
+ "Granting the leadership shouldn't
happen repeatedly.");
+
+ hasLeadership.set(true);
+ listener.ifPresent(l ->
l.isLeader(leaderSessionID));
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setRevokeConsumer(
+ (lock, listener) -> {
+ try {
+ lock.lock();
+ Preconditions.checkState(
+ hasLeadership.get(),
+ "Revoking the leadership shouldn't
happen repeatedly.");
+ hasLeadership.set(false);
+ listener.ifPresent(Listener::notLeader);
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setCloseConsumer(
+ lock -> {
+ try {
+ lock.lock();
+ isClosed.set(true);
+ } finally {
+ lock.unlock();
+ }
+ });
}
- public void onFatalError(Throwable throwable) {
- fatalErrorHandler.onFatalError(throwable);
+ /**
+ * {@code Factory} implements {@link
MultipleComponentLeaderElectionDriverFactory} for the
+ * {@code TestingLeaderElectionDriver}.
+ */
+ public static class Factory implements
MultipleComponentLeaderElectionDriverFactory {
+
+ private final TestingLeaderElectionDriver driver;
+
+ public static Factory createFactoryWithNoOpDriver() {
+ return new
Factory(TestingLeaderElectionDriver.newNoOpBuilder().build());
+ }
+
+ public static Factory defaultDriverFactory(
+ AtomicBoolean hasLeadership,
+ AtomicReference<LeaderInformationRegister>
storedLeaderInformation,
+ AtomicBoolean isClosed) {
+ return new Factory(
+ TestingLeaderElectionDriver.newBuilder(
+ hasLeadership, storedLeaderInformation,
isClosed)
+ .build());
+ }
+
+ public Factory(TestingLeaderElectionDriver driver) {
+ this.driver = driver;
+ }
+
+ @Override
+ public MultipleComponentLeaderElectionDriver create(
+ Listener leaderElectionListener, FatalErrorHandler
fatalErrorHandler)
+ throws Exception {
+ driver.initialize(leaderElectionListener, fatalErrorHandler);
Review Comment:
You're right - I followed the `TestingMultipleComponentLeaderElectionDriver`
(which is deleted in 8928e572). The legacy testing implementation was closer to
what you're suggesting (see 3f2e4619) where the driver instance is not passed
into the factory instance through the constructor but just collected as part of
the `createLeaderElectionDriver` call.
Anyway, both approaches are not optimal design-wise. This is due to the fact
that the interface is not properly designed (we're planning to fix that in
FLINK-32381). I would leave it like that for now and fix the issue in
FLINK-32381. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]