tillrohrmann commented on a change in pull request #16801:
URL: https://github.com/apache/flink/pull/16801#discussion_r688576981
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
##########
@@ -18,317 +18,228 @@
package org.apache.flink.runtime.leaderelection;
-import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
-import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
-import org.apache.flink.runtime.rpc.DirectlyFailingFatalErrorHandler;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.BiConsumerWithException;
import
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.test.TestingServer;
-import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
-import javax.annotation.Nullable;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertFalse;
-/** Tests for the error handling in case of a suspended connection to the
ZooKeeper instance. */
+/**
+ * Test behaviors of {@link ZooKeeperLeaderElectionDriver} when losing the
connection to ZooKeeper.
+ */
public class ZooKeeperLeaderElectionConnectionHandlingTest extends TestLogger {
- private TestingServer testingServer;
+ private static final String PATH = "/path";
- private Configuration config;
+ @Rule public final ZooKeeperResource zooKeeperResource = new
ZooKeeperResource();
- private CuratorFramework zooKeeperClient;
+ @Rule
+ public final TestingFatalErrorHandlerResource fatalErrorHandlerResource =
+ new TestingFatalErrorHandlerResource();
- private final FatalErrorHandler fatalErrorHandler =
DirectlyFailingFatalErrorHandler.INSTANCE;
+ private final Configuration configuration = new Configuration();
@Before
- public void before() throws Exception {
- testingServer = new TestingServer();
-
- config = new Configuration();
- config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
- config.setString(
- HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
testingServer.getConnectString());
-
- zooKeeperClient = ZooKeeperUtils.startCuratorFramework(config);
- zooKeeperClient.blockUntilConnected();
- }
-
- @After
- public void after() throws Exception {
- closeTestServer();
-
- if (zooKeeperClient != null) {
- zooKeeperClient.close();
- zooKeeperClient = null;
- }
+ public void setup() {
+ configuration.set(
+ HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperResource.getConnectString());
}
@Test
- public void testConnectionSuspendedHandlingDuringInitialization() throws
Exception {
- final QueueLeaderElectionListener queueLeaderElectionListener =
- new QueueLeaderElectionListener(1);
-
- LeaderRetrievalDriver leaderRetrievalDriver = null;
- try {
- leaderRetrievalDriver =
-
ZooKeeperUtils.createLeaderRetrievalDriverFactory(zooKeeperClient)
- .createLeaderRetrievalDriver(
- queueLeaderElectionListener,
fatalErrorHandler);
-
- // do the testing
- final CompletableFuture<String> firstAddress =
- queueLeaderElectionListener.next(Duration.ofMillis(50));
- assertThat(
- "No results are expected, yet, since no leader was
elected.",
- firstAddress,
- is(nullValue()));
-
- closeTestServer();
-
- // QueueLeaderElectionListener will be notified with an empty
leader when ZK connection
- // is suspended
- final CompletableFuture<String> secondAddress =
queueLeaderElectionListener.next();
- assertThat("The next result must not be missing.", secondAddress,
is(notNullValue()));
- assertThat(
- "The next result is expected to be null.",
- secondAddress.get(),
- is(nullValue()));
- } finally {
- if (leaderRetrievalDriver != null) {
- leaderRetrievalDriver.close();
- }
- }
+ public void testLoseLeadershipOnConnectionSuspended() throws Exception {
+ runTestWithSuspendedZooKeeperConnection(
+ configuration,
+ (connectionStateListener, contender) -> {
+ connectionStateListener.awaitSuspendedConnection();
+ contender.awaitRevokeLeadership(Duration.ofSeconds(1L));
+ });
}
@Test
- public void testConnectionSuspendedHandling() throws Exception {
- final String retrievalPath = "/testConnectionSuspendedHandling";
- final String leaderAddress = "localhost";
-
- final QueueLeaderElectionListener queueLeaderElectionListener =
- new QueueLeaderElectionListener(1);
- ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
- try {
- leaderRetrievalDriver =
- new ZooKeeperLeaderRetrievalDriver(
- zooKeeperClient,
- retrievalPath,
- queueLeaderElectionListener,
- fatalErrorHandler);
-
- writeLeaderInformationToZooKeeper(
- leaderRetrievalDriver.getConnectionInformationPath(),
- leaderAddress,
- UUID.randomUUID());
-
- // do the testing
- CompletableFuture<String> firstAddress =
queueLeaderElectionListener.next();
- assertThat(
- "The first result is expected to be the initially set
leader address.",
- firstAddress.get(),
- is(leaderAddress));
-
- closeTestServer();
-
- CompletableFuture<String> secondAddress =
queueLeaderElectionListener.next();
- assertThat("The next result must not be missing.", secondAddress,
is(notNullValue()));
- assertThat(
- "The next result is expected to be null.",
- secondAddress.get(),
- is(nullValue()));
- } finally {
- if (leaderRetrievalDriver != null) {
- leaderRetrievalDriver.close();
- }
- }
+ public void
testKeepLeadershipOnSuspendedConnectionIfTolerateSuspendedConnectionsIsEnabled()
+ throws Exception {
+
configuration.set(HighAvailabilityOptions.ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS,
true);
+ runTestWithSuspendedZooKeeperConnection(
+ configuration,
+ (connectionStateListener, contender) -> {
+ connectionStateListener.awaitSuspendedConnection();
+ connectionStateListener.awaitReconnectedConnection();
+ assertFalse(contender.hasRevokeLeadershipBeenTriggered());
+ });
}
@Test
- public void testSameLeaderAfterReconnectTriggersListenerNotification()
throws Exception {
- final String retrievalPath =
"/testSameLeaderAfterReconnectTriggersListenerNotification";
- final QueueLeaderElectionListener queueLeaderElectionListener =
- new QueueLeaderElectionListener(1);
- ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
- try {
- leaderRetrievalDriver =
- new ZooKeeperLeaderRetrievalDriver(
- zooKeeperClient,
- retrievalPath,
- queueLeaderElectionListener,
- fatalErrorHandler);
-
- final String leaderAddress = "foobar";
- final UUID sessionId = UUID.randomUUID();
- writeLeaderInformationToZooKeeper(
- leaderRetrievalDriver.getConnectionInformationPath(),
leaderAddress, sessionId);
+ public void
testLoseLeadershipOnLostConnectionIfTolerateSuspendedConnectionsIsEnabled()
Review comment:
The problem is that the ZooKeeper testing server has a tick time of 3s.
The session timeout will be negotiated to 6s (two times the tick time).
Configuring the tick time is not trivial for the ZooKeeper testing server.
Moreover, setting a too aggressive value might make this test unstable on CI.
That's why I accepted this rather long testing time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]