This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2698946 PIP-45: Revalidate leader election after session is recovered
(#10457)
2698946 is described below
commit 269894682d4bd3bc65958bcaa98997fb06a6f606
Author: Matteo Merli <[email protected]>
AuthorDate: Mon May 3 21:53:30 2021 -0700
PIP-45: Revalidate leader election after session is recovered (#10457)
* PIP-45: Revalidate leader election after session is recovered
* Fixed TopicOwnerTest
* Addresses comments
---
.../pulsar/broker/service/TopicOwnerTest.java | 214 +--------------------
.../coordination/impl/LeaderElectionImpl.java | 86 +++++++--
.../apache/pulsar/metadata/LeaderElectionTest.java | 103 +++++++++-
.../org/apache/pulsar/metadata/ZKSessionTest.java | 55 ++++++
4 files changed, 227 insertions(+), 231 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 62ec5f4..a7e58ce 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -48,6 +48,8 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -59,6 +61,7 @@ import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.SessionTracker;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
@@ -163,14 +166,14 @@ public class TopicOwnerTest {
return leaderAuthorizedBroker;
}
- private CompletableFuture<Void> watchZookeeperReconnect(ZooKeeper
zooKeeper) throws Exception {
+ private CompletableFuture<Void>
watchMetadataStoreReconnect(MetadataStoreExtended store) {
CompletableFuture<Void> reconnectedFuture = new CompletableFuture<>();
- zooKeeper.exists("/", (WatchedEvent event) -> {
- Watcher.Event.KeeperState state = event.getState();
- if (state == Watcher.Event.KeeperState.SyncConnected) {
+ store.registerSessionListener(event -> {
+ if (event == SessionEvent.Reconnected || event ==
SessionEvent.SessionReestablished) {
reconnectedFuture.complete(null);
}
});
+
return reconnectedFuture;
}
@@ -239,103 +242,6 @@ public class TopicOwnerTest {
}
@Test
- public void
testAcquireOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeCreated()
throws Exception {
- String topic1 = "persistent://my-tenant/my-ns/topic-1";
- NamespaceService leaderNamespaceService =
leaderPulsar.getNamespaceService();
- NamespaceBundle namespaceBundle =
leaderNamespaceService.getBundle(TopicName.get(topic1));
-
- final MutableObject<PulsarService> leaderAuthorizedBroker =
spyLeaderNamespaceServiceForAuthorizedBroker();
-
- PulsarService pulsar1 = pulsarServices[1];
- final ZooKeeper zooKeeper1 = pulsar1.getZkClient();
-
- final CompletableFuture<Void> reconnectedFuture =
watchZookeeperReconnect(zooKeeper1);
-
- String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);
-
- spyZookeeperToDisconnectBeforePersist(zooKeeper1, request -> {
- if (request.type != ZooDefs.OpCode.create) {
- return false;
- }
-
- CreateRequest createRequest = new CreateRequest();
-
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(),
createRequest);
- return createRequest.getPath().contains(namespaceBundlePath);
- });
-
- leaderAuthorizedBroker.setValue(pulsar1);
-
- try {
- // Trigger ownership acquiring and zookeeper disconnecting before
ownership node created.
- //
- // Ignore its execution result since whether it is fail or not
depends on concrete implementation.
- pulsarAdmins[1].lookups().lookupTopic(topic1);
- } catch (Exception ex) {
- // Ignored intentionally.
- }
-
- reconnectedFuture.join();
-
- // We don't known whether previous lookup was successful or not, but
now all lookups should succeed.
- Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
-
- pulsar1.getBrokerService().getTopic(topic1, true).join();
-
- Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- }
-
- @Test
- public void
testAcquireOwnershipWithZookeeperDisconnectedAfterOwnershipNodeCreated() throws
Exception {
- String topic1 = "persistent://my-tenant/my-ns/topic-1";
- NamespaceService leaderNamespaceService =
leaderPulsar.getNamespaceService();
- NamespaceBundle namespaceBundle =
leaderNamespaceService.getBundle(TopicName.get(topic1));
-
- final MutableObject<PulsarService> leaderAuthorizedBroker =
spyLeaderNamespaceServiceForAuthorizedBroker();
-
- PulsarService pulsar1 = pulsarServices[1];
- final ZooKeeper zooKeeper1 = pulsar1.getZkClient();
-
- final CompletableFuture<Void> reconnectedFuture =
watchZookeeperReconnect(zooKeeper1);
-
- String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);
-
- spyZookeeperToDisconnectAfterPersist(zooKeeper1, request -> {
- if (request.type != ZooDefs.OpCode.create) {
- return false;
- }
-
- CreateRequest createRequest = new CreateRequest();
-
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(),
createRequest);
- return createRequest.getPath().contains(namespaceBundlePath);
- });
-
- leaderAuthorizedBroker.setValue(pulsar1);
-
- try {
- // Trigger ownership acquiring and zookeeper disconnecting after
ownership node created.
- //
- // Ignore its execution result since whether it is fail or not
depends on concrete implementation.
- pulsarAdmins[1].lookups().lookupTopic(topic1);
- } catch (Exception ex) {
- // Ignored intentionally.
- }
-
- reconnectedFuture.join();
-
- Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
-
- pulsar1.getBrokerService().getTopic(topic1, true).join();
-
- Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- }
-
- @Test
public void testReestablishOwnershipAfterInvalidateCache() throws
Exception {
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService =
leaderPulsar.getNamespaceService();
@@ -389,112 +295,6 @@ public class TopicOwnerTest {
}
@Test
- public void
testReleaseOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeDeleted()
throws Exception {
- String topic1 = "persistent://my-tenant/my-ns/topic-1";
- NamespaceService leaderNamespaceService =
leaderPulsar.getNamespaceService();
- NamespaceBundle namespaceBundle =
leaderNamespaceService.getBundle(TopicName.get(topic1));
-
- final MutableObject<PulsarService> leaderAuthorizedBroker =
spyLeaderNamespaceServiceForAuthorizedBroker();
-
- PulsarService pulsar1 = pulsarServices[1];
- PulsarService pulsar2 = pulsarServices[2];
-
- leaderAuthorizedBroker.setValue(pulsar1);
- Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
-
- ZooKeeper zooKeeper1 = pulsar1.getZkClient();
-
- CompletableFuture<Void> reconnectedFuture =
watchZookeeperReconnect(zooKeeper1);
-
- String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);
-
- spyZookeeperToDisconnectBeforePersist(zooKeeper1, request -> {
- if (request.type != ZooDefs.OpCode.delete) {
- return false;
- }
- DeleteRequest deleteRequest = new DeleteRequest();
-
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(),
deleteRequest);
- return deleteRequest.getPath().contains(namespaceBundlePath);
- });
-
- try {
-
pulsarAdmins[1].namespaces().unloadNamespaceBundle(namespaceBundle.getNamespaceObject().toString(),
namespaceBundle.getBundleRange());
- } catch (Exception ex) {
- // Ignored since whether failing unloading when zk connection-loss
is an implementation detail.
- }
-
- reconnectedFuture.join();
-
- leaderAuthorizedBroker.setValue(pulsar2);
-
- // We don't known whether previous unload was successful or not, but
now all lookups should return same result.
- final String currentBrokerServiceUrl =
pulsarAdmins[0].lookups().lookupTopic(topic1);
- Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1),
currentBrokerServiceUrl);
- Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1),
currentBrokerServiceUrl);
- Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1),
currentBrokerServiceUrl);
- Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1),
currentBrokerServiceUrl);
-
- pulsarAdmins[0].topics().createNonPartitionedTopic(topic1);
- }
-
- @Test
- public void
testReleaseOwnershipWithZookeeperDisconnectedAfterOwnershipNodeDeleted() throws
Exception {
- String topic1 = "persistent://my-tenant/my-ns/topic-1";
- NamespaceService leaderNamespaceService =
leaderPulsar.getNamespaceService();
- NamespaceBundle namespaceBundle =
leaderNamespaceService.getBundle(TopicName.get(topic1));
-
- final MutableObject<PulsarService> leaderAuthorizedBroker =
spyLeaderNamespaceServiceForAuthorizedBroker();
-
- PulsarService pulsar1 = pulsarServices[1];
- PulsarService pulsar2 = pulsarServices[2];
-
- leaderAuthorizedBroker.setValue(pulsar1);
- Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1),
pulsar1.getBrokerServiceUrl());
-
- ZooKeeper zooKeeper1 = pulsar1.getZkClient();
-
- CompletableFuture<Void> reconnectedFuture =
watchZookeeperReconnect(zooKeeper1);
-
- String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);
-
- spyZookeeperToDisconnectAfterPersist(zooKeeper1, request -> {
- if (request.type != ZooDefs.OpCode.delete) {
- return false;
- }
- DeleteRequest deleteRequest = new DeleteRequest();
-
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(),
deleteRequest);
- return deleteRequest.getPath().contains(namespaceBundlePath);
- });
-
- try {
-
pulsarAdmins[1].namespaces().unloadNamespaceBundle(namespaceBundle.getNamespaceObject().toString(),
namespaceBundle.getBundleRange());
- } catch (Exception ex) {
- // Ignored since whether failing unloading when zk connection-loss
is an implementation detail.
- }
-
- reconnectedFuture.join();
-
- leaderAuthorizedBroker.setValue(pulsar2);
-
- Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1),
pulsar2.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1),
pulsar2.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1),
pulsar2.getBrokerServiceUrl());
-
- pulsar2.getBrokerService().getTopic(topic1, true).join();
-
- Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1),
pulsar2.getBrokerServiceUrl());
- Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1),
pulsar2.getBrokerServiceUrl());
- }
-
- @Test
public void testConnectToInvalidateBundleCacheBroker() throws Exception {
Assert.assertEquals(pulsarAdmins[0].namespaces().getPolicies("my-tenant/my-ns").bundles.getNumBundles(),
16);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 708b72b..19b0230 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -34,6 +34,7 @@ import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException;
@@ -44,11 +45,12 @@ import
org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
import org.apache.pulsar.metadata.cache.impl.MetadataSerde;
@Slf4j
-class LeaderElectionImpl<T> implements LeaderElection<T>,
Consumer<Notification> {
+class LeaderElectionImpl<T> implements LeaderElection<T> {
private final String path;
private final MetadataSerde<T> serde;
private final MetadataStoreExtended store;
@@ -61,7 +63,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T>,
Consumer<Notification>
private final ScheduledExecutorService executor;
- private static enum InternalState {
+ private enum InternalState {
Init, ElectionInProgress, LeaderIsPresent, Closed
}
@@ -80,7 +82,8 @@ class LeaderElectionImpl<T> implements LeaderElection<T>,
Consumer<Notification>
this.stateChangesListener = stateChangesListener;
this.executor = Executors.newScheduledThreadPool(0, new
DefaultThreadFactory("leader-election-executor"));
- store.registerListener(this);
+ store.registerListener(this::handlePathNotification);
+ store.registerSessionListener(this::handleSessionNotification);
}
@Override
@@ -96,26 +99,58 @@ class LeaderElectionImpl<T> implements LeaderElection<T>,
Consumer<Notification>
private synchronized CompletableFuture<LeaderElectionState> elect() {
// First check if there's already a leader elected
internalState = InternalState.ElectionInProgress;
- return cache.get(path).thenCompose(optLock -> {
+ return store.get(path).thenCompose(optLock -> {
if (optLock.isPresent()) {
- synchronized (LeaderElectionImpl.this) {
- internalState = InternalState.LeaderIsPresent;
- if (leaderElectionState != LeaderElectionState.Following) {
- leaderElectionState = LeaderElectionState.Following;
- try {
- stateChangesListener.accept(leaderElectionState);
- } catch (Throwable t) {
- log.warn("Exception in state change listener", t);
- }
- }
- return
CompletableFuture.completedFuture(leaderElectionState);
- }
+ return handleExistingLeaderValue(optLock.get());
} else {
return tryToBecomeLeader();
}
});
}
+ private synchronized CompletableFuture<LeaderElectionState>
handleExistingLeaderValue(GetResult res) {
+ T existingValue;
+ try {
+ existingValue = serde.deserialize(res.getValue());
+ } catch (Throwable t) {
+ return FutureUtils.exception(t);
+ }
+
+ if (existingValue.equals(proposedValue.orElse(null))) {
+ // If the value is the same as our proposed value, it means this
instance was the leader at some
+ // point before. The existing value can either be for this same
session or for a previous one.
+ if (res.getStat().isCreatedBySelf()) {
+ // The value is still valid because it was created in the same
session
+ changeState(LeaderElectionState.Leading);
+ } else {
+ // Since the value was created in a different session, it
might be expiring. We need to delete it
+ // and try the election again.
+ return store.delete(path,
Optional.of(res.getStat().getVersion()))
+ .thenCompose(__ -> tryToBecomeLeader());
+ }
+ } else if (res.getStat().isCreatedBySelf()) {
+ // The existing value is different but was created from the same
session
+ return store.delete(path, Optional.of(res.getStat().getVersion()))
+ .thenCompose(__ -> tryToBecomeLeader());
+ }
+
+ // If the existing value is different, it means there's already
another leader
+ changeState(LeaderElectionState.Following);
+ return
CompletableFuture.completedFuture(LeaderElectionState.Following);
+ }
+
+ private synchronized void changeState(LeaderElectionState les) {
+ internalState = InternalState.LeaderIsPresent;
+ if (this.leaderElectionState != les) {
+ this.leaderElectionState = les;
+ try {
+ stateChangesListener.accept(leaderElectionState);
+ } catch (Throwable t) {
+ log.warn("Exception in state change listener", t);
+ }
+ }
+ }
+
private synchronized CompletableFuture<LeaderElectionState>
tryToBecomeLeader() {
byte[] payload;
try {
@@ -133,7 +168,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T>,
Consumer<Notification>
cache.get(path)
.thenRun(() -> {
synchronized (LeaderElectionImpl.this)
{
- log.info("Acquired resource lock
on {}", path);
+ log.info("Acquired leadership on
{}", path);
internalState =
InternalState.LeaderIsPresent;
if (leaderElectionState !=
LeaderElectionState.Leading) {
leaderElectionState =
LeaderElectionState.Leading;
@@ -233,8 +268,19 @@ class LeaderElectionImpl<T> implements LeaderElection<T>,
Consumer<Notification>
return cache.getIfCached(path);
}
- @Override
- public void accept(Notification notification) {
+ private synchronized void handleSessionNotification(SessionEvent event) {
+ if (event == SessionEvent.SessionReestablished) {
+ if (leaderElectionState == LeaderElectionState.Leading) {
+ log.info("Revalidating leadership for {}", path);
+ }
+
+ elect().thenAccept(les -> {
+ log.info("Resynced leadership for {} - State: {}", path, les);
+ });
+ }
+ }
+
+ private void handlePathNotification(Notification notification) {
if (!path.equals(notification.getPath())) {
// Ignore notifications we don't care about
return;
@@ -249,7 +295,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T>,
Consumer<Notification>
if (notification.getType() == NotificationType.Deleted) {
if (leaderElectionState == LeaderElectionState.Leading) {
// We've lost the leadership, switch to follower mode
- log.info("Leader released for {}", path);
+ log.warn("Leadership released for {}", path);
}
leaderElectionState = LeaderElectionState.NoLeader;
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
index 1966be0..fac83ff 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
+import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
@@ -27,11 +28,13 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.testng.annotations.Test;
@@ -72,11 +75,19 @@ public class LeaderElectionTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void multipleMembers(String provider, String url) throws Exception {
+ if (provider.equals("Memory")) {
+ // There are no multiple session in local mem provider
+ return;
+ }
+
@Cleanup
- MetadataStoreExtended store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder().build());
+ MetadataStoreExtended store1 = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder().build());
+ @Cleanup
+ MetadataStoreExtended store2 = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder().build());
+
@Cleanup
- CoordinationService cs1 = new CoordinationServiceImpl(store);
+ CoordinationService cs1 = new CoordinationServiceImpl(store1);
BlockingQueue<LeaderElectionState> n1 = new LinkedBlockingDeque<>();
@@ -87,7 +98,7 @@ public class LeaderElectionTest extends BaseMetadataStoreTest
{
});
@Cleanup
- CoordinationService cs2 = new CoordinationServiceImpl(store);
+ CoordinationService cs2 = new CoordinationServiceImpl(store2);
BlockingQueue<LeaderElectionState> n2 = new LinkedBlockingDeque<>();
@@ -105,8 +116,8 @@ public class LeaderElectionTest extends
BaseMetadataStoreTest {
LeaderElectionState les2 = le2.elect("test-2").join();
assertEquals(les2, LeaderElectionState.Following);
- assertEquals(le2.getLeaderValueIfPresent(), Optional.of("test-1"));
assertEquals(le2.getLeaderValue().join(), Optional.of("test-1"));
+ assertEquals(le2.getLeaderValueIfPresent(), Optional.of("test-1"));
assertEquals(n2.poll(3, TimeUnit.SECONDS),
LeaderElectionState.Following);
le1.close();
@@ -172,4 +183,88 @@ public class LeaderElectionTest extends
BaseMetadataStoreTest {
assertEquals(cache.get("/my/leader-election-2").join(),
Optional.empty());
}
+
+ @Test(dataProvider = "impl")
+ public void revalidateLeaderWithinSameSession(String provider, String url)
throws Exception {
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder().build());
+
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService cs = new CoordinationServiceImpl(store);
+
+ @Cleanup
+ LeaderElection<String> le = cs.getLeaderElection(String.class,
+ path, __ -> {
+ });
+
+ store.put(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes("test-1"),
Optional.of(-1L),
+ EnumSet.of(CreateOption.Ephemeral)).join();
+
+ LeaderElectionState les = le.elect("test-2").join();
+ assertEquals(les, LeaderElectionState.Leading);
+ assertEquals(le.getLeaderValue().join(), Optional.of("test-2"));
+ assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-2"));
+ }
+
+ @Test(dataProvider = "impl")
+ public void revalidateLeaderWithDifferentSessionsSameValue(String
provider, String url) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ MetadataStoreExtended store2 = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder().build());
+
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService cs = new CoordinationServiceImpl(store);
+
+ @Cleanup
+ LeaderElection<String> le = cs.getLeaderElection(String.class,
+ path, __ -> {
+ });
+
+ store2.put(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes("test-1"),
Optional.of(-1L),
+ EnumSet.of(CreateOption.Ephemeral)).join();
+
+ LeaderElectionState les = le.elect("test-1").join();
+ assertEquals(les, LeaderElectionState.Leading);
+ assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+ assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+ }
+
+
+ @Test(dataProvider = "impl")
+ public void revalidateLeaderWithDifferentSessionsDifferentValue(String
provider, String url) throws Exception {
+ if (provider.equals("Memory")) {
+ // There are no multiple sessions for the local memory provider
+ return;
+ }
+
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ MetadataStoreExtended store2 = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder().build());
+
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService cs = new CoordinationServiceImpl(store);
+
+ @Cleanup
+ LeaderElection<String> le = cs.getLeaderElection(String.class,
+ path, __ -> {
+ });
+
+ store2.put(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes("test-1"),
Optional.of(-1L),
+ EnumSet.of(CreateOption.Ephemeral)).join();
+
+ LeaderElectionState les = le.elect("test-2").join();
+ assertEquals(les, LeaderElectionState.Following);
+ assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+ assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+ }
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 89bd13a..f565a8f 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -26,11 +26,14 @@ import static org.testng.Assert.assertTrue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -133,4 +136,56 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
assertTrue(store.get(path).join().isPresent());
}
+
+ @Test
+ public void testReacquireLeadershipAfterSessionLost() throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(2_000)
+ .build());
+
+ BlockingQueue<SessionEvent> sessionEvents = new
LinkedBlockingQueue<>();
+ store.registerSessionListener(sessionEvents::add);
+
+ BlockingQueue<LeaderElectionState> leaderElectionEvents = new
LinkedBlockingQueue<>();
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService coordinationService = new
CoordinationServiceImpl(store);
+ @Cleanup
+ LeaderElection<String> le1 =
coordinationService.getLeaderElection(String.class, path,
+ leaderElectionEvents::add);
+
+ le1.elect("value-1").join();
+ assertEquals(le1.getState(), LeaderElectionState.Leading);
+
+ LeaderElectionState les = leaderElectionEvents.poll(5,
TimeUnit.SECONDS);
+ assertEquals(les, LeaderElectionState.Leading);
+
+ zks.expireSession(((ZKMetadataStore) store).getZkSessionId());
+
+ SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.ConnectionLost);
+
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.SessionLost);
+
+ assertEquals(le1.getState(), LeaderElectionState.Leading);
+ les = leaderElectionEvents.poll();
+ assertNull(les);
+
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.Reconnected);
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.SessionReestablished);
+
+ Thread.sleep(2_000);
+
+ assertEquals(le1.getState(), LeaderElectionState.Leading);
+ les = leaderElectionEvents.poll();
+ assertNull(les);
+
+ assertTrue(store.get(path).join().isPresent());
+ }
}