zentol commented on code in PR #22661:
URL: https://github.com/apache/flink/pull/22661#discussion_r1241975824
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java:
##########
@@ -162,20 +162,22 @@ void testZooKeeperReelection() throws Exception {
DefaultLeaderRetrievalService leaderRetrievalService = null;
TestingListener listener = new TestingListener();
-
Review Comment:
nit: revert
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java:
##########
@@ -135,8 +135,8 @@ private void runTestWithZooKeeperConnectionProblem(
configuration,
testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
CuratorFramework client = curatorFrameworkWrapper.asCuratorFramework();
- LeaderElectionDriverFactory leaderElectionDriverFactory =
- new ZooKeeperLeaderElectionDriverFactory(client, PATH);
+ MultipleComponentLeaderElectionDriverFactory
leaderElectionDriverFactory =
+ new
ZooKeeperMultipleComponentLeaderElectionDriverFactory(client);
Review Comment:
`PATH` is now unused
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
+ grantLeadership(UUID.randomUUID());
+ }
+
+ public void grantLeadership(UUID leaderSessionID) {
+ grantConsumer.accept(lock, listener, leaderSessionID);
+ }
+
+ public void revokeLeadership() {
+ revokeConsumer.accept(lock, listener);
+ }
+
+ public void triggerErrorHandling(Throwable throwable) {
+ runInLockIfPresent(fatalErrorHandler, f -> f.onFatalError(throwable));
+ }
+
+ public void triggerLeaderInformationChangeEvent(
+ String contenderID, LeaderInformation leaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyLeaderInformationChange(contenderID, leaderInformation));
+ }
+
+ public void triggerAllLeaderInformationChangeEvent(
+ LeaderInformationRegister newLeaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyAllKnownLeaderInformation(newLeaderInformation));
+ }
+
+ private <T> void runInLockIfPresent(Optional<T> optional, Consumer<T>
callback) {
+ try {
+ lock.lock();
+ optional.ifPresent(callback);
+ } finally {
+ lock.unlock();
+ }
}
@Override
public boolean hasLeadership() {
- return isLeader.get();
+ return hasLeadershipFunction.apply(lock);
}
@Override
- public void close() throws Exception {
- beforeLockCloseRunnable.accept(leaderElectionEventHandler);
- synchronized (lock) {
- closeRunnable.accept(leaderElectionEventHandler);
- }
+ public void publishLeaderInformation(String contenderID, LeaderInformation
leaderInformation) {
+ publishLeaderInformationConsumer.accept(lock, contenderID,
leaderInformation);
}
- public LeaderInformation getLeaderInformation() {
- return leaderInformation;
+ @Override
+ public void deleteLeaderInformation(String contenderID) {
+ deleteLeaderInformationConsumer.accept(lock, contenderID);
}
- public void isLeader(UUID newSessionID) {
- synchronized (lock) {
- isLeader.set(true);
- beforeGrantRunnable.accept(leaderElectionEventHandler);
- leaderElectionEventHandler.onGrantLeadership(newSessionID);
- }
+ @Override
+ public void close() throws Exception {
+ closeConsumer.accept(lock);
}
- public void isLeader() {
- isLeader(UUID.randomUUID());
+ public static Builder newNoOpBuilder() {
+ return new Builder();
}
- public void notLeader() {
- synchronized (lock) {
- isLeader.set(false);
- leaderElectionEventHandler.onRevokeLeadership();
- }
+ public static Builder newBuilder() {
+ return newBuilder(new AtomicBoolean(), new AtomicReference<>(), new
AtomicBoolean());
}
- public void leaderInformationChanged(LeaderInformation newLeader) {
- leaderInformation = newLeader;
- leaderElectionEventHandler.onLeaderInformationChange(newLeader);
+ /**
+ * Returns a {@code Builder} that comes with a basic default
implementation of the {@link
+ * MultipleComponentLeaderElectionDriver} contract using the passed
parameters for information
+ * storage.
+ *
+ * @param hasLeadership saves the current leadership state of the instance
that is created from
+ * the {@code Builder}.
+ * @param storedLeaderInformation saves the leader information that would
be otherwise stored in
+ * some external storage.
+ * @param isClosed saves the running state of the driver.
+ */
+ public static Builder newBuilder(
+ AtomicBoolean hasLeadership,
+ AtomicReference<LeaderInformationRegister> storedLeaderInformation,
+ AtomicBoolean isClosed) {
+ Preconditions.checkState(
+ !hasLeadership.get(), "Initial state check for hasLeadership
failed.");
+ Preconditions.checkState(
+ storedLeaderInformation.get() == null
+ || !storedLeaderInformation
+ .get()
+ .getRegisteredContenderIDs()
+ .iterator()
+ .hasNext(),
+ "Initial state check for storedLeaderInformation failed.");
+ Preconditions.checkState(!isClosed.get(), "Initial state check for
isClosed failed.");
+ return newNoOpBuilder()
+ .setHasLeadershipFunction(
+ lock -> {
+ try {
+ lock.lock();
+ return hasLeadership.get();
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setPublishLeaderInformationConsumer(
+ (lock, contenderID, leaderInformation) -> {
+ try {
+ lock.lock();
+ if (hasLeadership.get()) {
+ storedLeaderInformation.getAndUpdate(
+ oldData -> {
+ if (oldData != null) {
+ if
(leaderInformation.isEmpty()) {
+ return
LeaderInformationRegister.clear(
+ oldData,
contenderID);
+ } else {
+ return
LeaderInformationRegister.merge(
+ oldData,
+ contenderID,
+
leaderInformation);
+ }
+ } else if
(!leaderInformation.isEmpty()) {
+ return
LeaderInformationRegister.of(
+ contenderID,
leaderInformation);
+ }
+
+ return new
LeaderInformationRegister();
+ });
+ }
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setDeleteLeaderInformationConsumer(
+ (lock, contenderID) -> {
+ try {
+ lock.lock();
+ if (hasLeadership.get()) {
+ storedLeaderInformation.getAndUpdate(
+ oldData ->
+ oldData != null
+ ?
LeaderInformationRegister.clear(
+ oldData,
contenderID)
+ : new
LeaderInformationRegister());
+ }
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setGrantConsumer(
+ (lock, listener, leaderSessionID) -> {
+ try {
+ lock.lock();
+ Preconditions.checkState(
+ !hasLeadership.get(),
+ "Granting the leadership shouldn't
happen repeatedly.");
+
+ hasLeadership.set(true);
+ listener.ifPresent(l ->
l.isLeader(leaderSessionID));
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setRevokeConsumer(
+ (lock, listener) -> {
+ try {
+ lock.lock();
+ Preconditions.checkState(
+ hasLeadership.get(),
+ "Revoking the leadership shouldn't
happen repeatedly.");
+ hasLeadership.set(false);
+ listener.ifPresent(Listener::notLeader);
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setCloseConsumer(
+ lock -> {
+ try {
+ lock.lock();
+ isClosed.set(true);
+ } finally {
+ lock.unlock();
+ }
+ });
}
- public void onFatalError(Throwable throwable) {
- fatalErrorHandler.onFatalError(throwable);
+ /**
+ * {@code Factory} implements {@link
MultipleComponentLeaderElectionDriverFactory} for the
+ * {@code TestingLeaderElectionDriver}.
+ */
+ public static class Factory implements
MultipleComponentLeaderElectionDriverFactory {
+
+ private final TestingLeaderElectionDriver driver;
+
+ public static Factory createFactoryWithNoOpDriver() {
+ return new
Factory(TestingLeaderElectionDriver.newNoOpBuilder().build());
+ }
+
+ public static Factory defaultDriverFactory(
+ AtomicBoolean hasLeadership,
+ AtomicReference<LeaderInformationRegister>
storedLeaderInformation,
+ AtomicBoolean isClosed) {
+ return new Factory(
+ TestingLeaderElectionDriver.newBuilder(
+ hasLeadership, storedLeaderInformation,
isClosed)
+ .build());
+ }
+
+ public Factory(TestingLeaderElectionDriver driver) {
+ this.driver = driver;
+ }
+
+ @Override
+ public MultipleComponentLeaderElectionDriver create(
+ Listener leaderElectionListener, FatalErrorHandler
fatalErrorHandler)
+ throws Exception {
+ driver.initialize(leaderElectionListener, fatalErrorHandler);
Review Comment:
It seems sketchy to provide a factory that must only ever be called once.
Wondering if the factory shouldnt store the driver builder, and finalize it in
the create call.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
+ grantLeadership(UUID.randomUUID());
+ }
+
+ public void grantLeadership(UUID leaderSessionID) {
+ grantConsumer.accept(lock, listener, leaderSessionID);
+ }
+
+ public void revokeLeadership() {
+ revokeConsumer.accept(lock, listener);
+ }
+
+ public void triggerErrorHandling(Throwable throwable) {
+ runInLockIfPresent(fatalErrorHandler, f -> f.onFatalError(throwable));
+ }
+
+ public void triggerLeaderInformationChangeEvent(
+ String contenderID, LeaderInformation leaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyLeaderInformationChange(contenderID, leaderInformation));
+ }
+
+ public void triggerAllLeaderInformationChangeEvent(
+ LeaderInformationRegister newLeaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyAllKnownLeaderInformation(newLeaderInformation));
+ }
+
+ private <T> void runInLockIfPresent(Optional<T> optional, Consumer<T>
callback) {
+ try {
+ lock.lock();
+ optional.ifPresent(callback);
+ } finally {
+ lock.unlock();
+ }
}
@Override
public boolean hasLeadership() {
- return isLeader.get();
+ return hasLeadershipFunction.apply(lock);
}
@Override
- public void close() throws Exception {
- beforeLockCloseRunnable.accept(leaderElectionEventHandler);
- synchronized (lock) {
- closeRunnable.accept(leaderElectionEventHandler);
- }
+ public void publishLeaderInformation(String contenderID, LeaderInformation
leaderInformation) {
+ publishLeaderInformationConsumer.accept(lock, contenderID,
leaderInformation);
}
- public LeaderInformation getLeaderInformation() {
- return leaderInformation;
+ @Override
+ public void deleteLeaderInformation(String contenderID) {
+ deleteLeaderInformationConsumer.accept(lock, contenderID);
}
- public void isLeader(UUID newSessionID) {
- synchronized (lock) {
- isLeader.set(true);
- beforeGrantRunnable.accept(leaderElectionEventHandler);
- leaderElectionEventHandler.onGrantLeadership(newSessionID);
- }
+ @Override
+ public void close() throws Exception {
+ closeConsumer.accept(lock);
}
- public void isLeader() {
- isLeader(UUID.randomUUID());
+ public static Builder newNoOpBuilder() {
+ return new Builder();
}
- public void notLeader() {
- synchronized (lock) {
- isLeader.set(false);
- leaderElectionEventHandler.onRevokeLeadership();
- }
+ public static Builder newBuilder() {
+ return newBuilder(new AtomicBoolean(), new AtomicReference<>(), new
AtomicBoolean());
}
- public void leaderInformationChanged(LeaderInformation newLeader) {
- leaderInformation = newLeader;
- leaderElectionEventHandler.onLeaderInformationChange(newLeader);
+ /**
+ * Returns a {@code Builder} that comes with a basic default
implementation of the {@link
+ * MultipleComponentLeaderElectionDriver} contract using the passed
parameters for information
+ * storage.
+ *
+ * @param hasLeadership saves the current leadership state of the instance
that is created from
+ * the {@code Builder}.
+ * @param storedLeaderInformation saves the leader information that would
be otherwise stored in
+ * some external storage.
+ * @param isClosed saves the running state of the driver.
+ */
+ public static Builder newBuilder(
+ AtomicBoolean hasLeadership,
+ AtomicReference<LeaderInformationRegister> storedLeaderInformation,
+ AtomicBoolean isClosed) {
+ Preconditions.checkState(
+ !hasLeadership.get(), "Initial state check for hasLeadership
failed.");
+ Preconditions.checkState(
+ storedLeaderInformation.get() == null
+ || !storedLeaderInformation
+ .get()
+ .getRegisteredContenderIDs()
+ .iterator()
+ .hasNext(),
+ "Initial state check for storedLeaderInformation failed.");
+ Preconditions.checkState(!isClosed.get(), "Initial state check for
isClosed failed.");
+ return newNoOpBuilder()
+ .setHasLeadershipFunction(
+ lock -> {
+ try {
+ lock.lock();
+ return hasLeadership.get();
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setPublishLeaderInformationConsumer(
+ (lock, contenderID, leaderInformation) -> {
+ try {
+ lock.lock();
+ if (hasLeadership.get()) {
+ storedLeaderInformation.getAndUpdate(
+ oldData -> {
+ if (oldData != null) {
+ if
(leaderInformation.isEmpty()) {
+ return
LeaderInformationRegister.clear(
Review Comment:
Is the production coding having the same internal logic, except that the
locking works differently? Wondering if we could share this somehow.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -237,66 +253,11 @@ void
testDelayedGrantCallAfterContenderBeingDeregisteredAgain() throws Exception
};
}
- /**
- * Test to cover the issue described in FLINK-31814. This test could be
removed after
- * FLINK-31814 is resolved.
- */
- @Test
- void testOnRevokeCallWhileClosingService() throws Exception {
- final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
- new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
- LeaderElectionEventHandler::onRevokeLeadership);
-
- try (final DefaultLeaderElectionService testInstance =
- new DefaultLeaderElectionService(driverFactory)) {
- testInstance.startLeaderElectionBackend();
-
- final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
- assertThat(driver).isNotNull();
-
- driver.isLeader();
-
- final LeaderElection leaderElection =
-
testInstance.createLeaderElection(createRandomContenderID());
- final TestingContender contender =
- new TestingContender("unused-address", leaderElection);
- contender.startLeaderElection();
-
- contender.waitForLeader();
-
- leaderElection.close();
-
- contender.throwErrorIfPresent();
- }
- }
-
- @Test
- void testStopWhileHavingLeadership() throws Exception {
Review Comment:
Why is this test being removed?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -237,66 +253,11 @@ void
testDelayedGrantCallAfterContenderBeingDeregisteredAgain() throws Exception
};
}
- /**
- * Test to cover the issue described in FLINK-31814. This test could be
removed after
- * FLINK-31814 is resolved.
Review Comment:
We aren't resolving FLINK-31814 in this ticket; why are we removing this
test?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java:
##########
@@ -372,7 +372,7 @@ boolean hasLeadership() {
return leaderElectionDriver.hasLeadership();
}
- CompletableFuture<Void> getLeadershipFuture() {
+ CompletableFuture<UUID> getLeadershipFuture() {
Review Comment:
nit: Seems unnecessary since we don't actually use the UUID?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java:
##########
@@ -263,16 +265,19 @@ void testZooKeeperReelectionWithReplacement() throws
Exception {
DefaultLeaderRetrievalService leaderRetrievalService = null;
TestingListener listener = new TestingListener();
-
Review Comment:
same as above
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
+ grantLeadership(UUID.randomUUID());
+ }
+
+ public void grantLeadership(UUID leaderSessionID) {
+ grantConsumer.accept(lock, listener, leaderSessionID);
+ }
+
+ public void revokeLeadership() {
+ revokeConsumer.accept(lock, listener);
+ }
+
+ public void triggerErrorHandling(Throwable throwable) {
+ runInLockIfPresent(fatalErrorHandler, f -> f.onFatalError(throwable));
+ }
+
+ public void triggerLeaderInformationChangeEvent(
+ String contenderID, LeaderInformation leaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyLeaderInformationChange(contenderID, leaderInformation));
+ }
+
+ public void triggerAllLeaderInformationChangeEvent(
+ LeaderInformationRegister newLeaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyAllKnownLeaderInformation(newLeaderInformation));
+ }
+
+ private <T> void runInLockIfPresent(Optional<T> optional, Consumer<T>
callback) {
+ try {
+ lock.lock();
+ optional.ifPresent(callback);
+ } finally {
+ lock.unlock();
+ }
}
Review Comment:
Are we locking here for the ifPresent check or for the execution of the
callback? If it is the latter then let's move the locking into `ifPresent` to
make this clear.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -666,66 +667,38 @@ void testErrorIsIgnoredAfterLeaderElectionBeingClosed()
throws Exception {
leaderElection.close();
-
testingLeaderElectionDriver.onFatalError(testException);
+
testingLeaderElectionDriver.triggerErrorHandling(testException);
assertThat(testingContender.getError()).isNull();
});
}
};
}
- /**
- * Tests that we can shut down the DefaultLeaderElectionService if the
used LeaderElectionDriver
- * holds an internal lock. See FLINK-20008 for more details.
- */
- @Test
- void testServiceShutDownWithSynchronizedDriver() throws Exception {
Review Comment:
was this a duplicate?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
+ grantLeadership(UUID.randomUUID());
+ }
+
+ public void grantLeadership(UUID leaderSessionID) {
+ grantConsumer.accept(lock, listener, leaderSessionID);
+ }
+
+ public void revokeLeadership() {
+ revokeConsumer.accept(lock, listener);
+ }
+
+ public void triggerErrorHandling(Throwable throwable) {
+ runInLockIfPresent(fatalErrorHandler, f -> f.onFatalError(throwable));
+ }
+
+ public void triggerLeaderInformationChangeEvent(
+ String contenderID, LeaderInformation leaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyLeaderInformationChange(contenderID, leaderInformation));
+ }
+
+ public void triggerAllLeaderInformationChangeEvent(
+ LeaderInformationRegister newLeaderInformation) {
+ runInLockIfPresent(
+ listener,
+ listener ->
listener.notifyAllKnownLeaderInformation(newLeaderInformation));
+ }
+
+ private <T> void runInLockIfPresent(Optional<T> optional, Consumer<T>
callback) {
+ try {
+ lock.lock();
+ optional.ifPresent(callback);
+ } finally {
+ lock.unlock();
+ }
}
@Override
public boolean hasLeadership() {
- return isLeader.get();
+ return hasLeadershipFunction.apply(lock);
}
@Override
- public void close() throws Exception {
- beforeLockCloseRunnable.accept(leaderElectionEventHandler);
- synchronized (lock) {
- closeRunnable.accept(leaderElectionEventHandler);
- }
+ public void publishLeaderInformation(String contenderID, LeaderInformation
leaderInformation) {
+ publishLeaderInformationConsumer.accept(lock, contenderID,
leaderInformation);
}
- public LeaderInformation getLeaderInformation() {
- return leaderInformation;
+ @Override
+ public void deleteLeaderInformation(String contenderID) {
+ deleteLeaderInformationConsumer.accept(lock, contenderID);
}
- public void isLeader(UUID newSessionID) {
- synchronized (lock) {
- isLeader.set(true);
- beforeGrantRunnable.accept(leaderElectionEventHandler);
- leaderElectionEventHandler.onGrantLeadership(newSessionID);
- }
+ @Override
+ public void close() throws Exception {
+ closeConsumer.accept(lock);
}
- public void isLeader() {
- isLeader(UUID.randomUUID());
+ public static Builder newNoOpBuilder() {
+ return new Builder();
}
- public void notLeader() {
- synchronized (lock) {
- isLeader.set(false);
- leaderElectionEventHandler.onRevokeLeadership();
- }
+ public static Builder newBuilder() {
+ return newBuilder(new AtomicBoolean(), new AtomicReference<>(), new
AtomicBoolean());
}
- public void leaderInformationChanged(LeaderInformation newLeader) {
- leaderInformation = newLeader;
- leaderElectionEventHandler.onLeaderInformationChange(newLeader);
+ /**
+ * Returns a {@code Builder} that comes with a basic default
implementation of the {@link
+ * MultipleComponentLeaderElectionDriver} contract using the passed
parameters for information
+ * storage.
+ *
+ * @param hasLeadership saves the current leadership state of the instance
that is created from
+ * the {@code Builder}.
+ * @param storedLeaderInformation saves the leader information that would
be otherwise stored in
+ * some external storage.
+ * @param isClosed saves the running state of the driver.
+ */
+ public static Builder newBuilder(
+ AtomicBoolean hasLeadership,
+ AtomicReference<LeaderInformationRegister> storedLeaderInformation,
+ AtomicBoolean isClosed) {
+ Preconditions.checkState(
+ !hasLeadership.get(), "Initial state check for hasLeadership
failed.");
+ Preconditions.checkState(
+ storedLeaderInformation.get() == null
+ || !storedLeaderInformation
+ .get()
+ .getRegisteredContenderIDs()
+ .iterator()
+ .hasNext(),
+ "Initial state check for storedLeaderInformation failed.");
+ Preconditions.checkState(!isClosed.get(), "Initial state check for
isClosed failed.");
+ return newNoOpBuilder()
+ .setHasLeadershipFunction(
+ lock -> {
+ try {
+ lock.lock();
+ return hasLeadership.get();
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setPublishLeaderInformationConsumer(
+ (lock, contenderID, leaderInformation) -> {
+ try {
+ lock.lock();
+ if (hasLeadership.get()) {
+ storedLeaderInformation.getAndUpdate(
+ oldData -> {
+ if (oldData != null) {
+ if
(leaderInformation.isEmpty()) {
+ return
LeaderInformationRegister.clear(
+ oldData,
contenderID);
+ } else {
+ return
LeaderInformationRegister.merge(
+ oldData,
+ contenderID,
+
leaderInformation);
+ }
+ } else if
(!leaderInformation.isEmpty()) {
+ return
LeaderInformationRegister.of(
+ contenderID,
leaderInformation);
+ }
+
+ return new
LeaderInformationRegister();
+ });
+ }
+ } finally {
+ lock.unlock();
+ }
+ })
+ .setDeleteLeaderInformationConsumer(
+ (lock, contenderID) -> {
+ try {
+ lock.lock();
+ if (hasLeadership.get()) {
+ storedLeaderInformation.getAndUpdate(
+ oldData ->
+ oldData != null
+ ?
LeaderInformationRegister.clear(
Review Comment:
same as above
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
-import javax.annotation.Nullable;
-
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
- * {@link LeaderElectionDriver} implementation which provides some convenience
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
*/
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements
MultipleComponentLeaderElectionDriver {
- private final Object lock = new Object();
+ private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+ private final TriConsumer<ReentrantLock, String, LeaderInformation>
+ publishLeaderInformationConsumer;
+ private final BiConsumer<ReentrantLock, String>
deleteLeaderInformationConsumer;
- private final AtomicBoolean isLeader = new AtomicBoolean(false);
- private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final FatalErrorHandler fatalErrorHandler;
+ private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable;
- private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable;
+ private final TriConsumer<ReentrantLock, Optional<Listener>, UUID>
grantConsumer;
+ private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
- private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+ private final ReentrantLock lock = new ReentrantLock();
- // Leader information on external storage
- private LeaderInformation leaderInformation = LeaderInformation.empty();
+ private Optional<Listener> listener = Optional.empty();
+ private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
- private TestingLeaderElectionDriver(
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
closeRunnable,
- ThrowingConsumer<LeaderElectionEventHandler, Exception>
beforeLockCloseRunnable,
- Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
- this.leaderElectionEventHandler = leaderElectionEventHandler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.closeRunnable = closeRunnable;
- this.beforeLockCloseRunnable = beforeLockCloseRunnable;
- this.beforeGrantRunnable = beforeGrantRunnable;
+ public TestingLeaderElectionDriver(
+ Function<ReentrantLock, Boolean> hasLeadershipFunction,
+ TriConsumer<ReentrantLock, String, LeaderInformation>
publishLeaderInformationConsumer,
+ BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+ ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+ TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+ BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+ this.hasLeadershipFunction = hasLeadershipFunction;
+ this.publishLeaderInformationConsumer =
publishLeaderInformationConsumer;
+ this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+ this.closeConsumer = closeConsumer;
+ this.grantConsumer = grantConsumer;
+ this.revokeConsumer = revokeConsumer;
}
- @Override
- public void writeLeaderInformation(LeaderInformation leaderInformation) {
- this.leaderInformation = leaderInformation;
+ private void initialize(Listener listener, FatalErrorHandler
fatalErrorHandler) {
+ Preconditions.checkState(
+ !this.listener.isPresent(), "The driver should be only
initialized once.");
+
+ this.listener = Optional.of(listener);
+ this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+ }
+
+ public void grantLeadership() {
Review Comment:
missing `@Override`? Or are these utility methods for tests? (Although then
it seems a bit odd to allow a consumer to be provided; why wouldn't I call
things directly...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]