This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2698946  PIP-45: Revalidate leader election after session is recovered 
(#10457)
2698946 is described below

commit 269894682d4bd3bc65958bcaa98997fb06a6f606
Author: Matteo Merli <[email protected]>
AuthorDate: Mon May 3 21:53:30 2021 -0700

    PIP-45: Revalidate leader election after session is recovered (#10457)
    
    * PIP-45: Revalidate leader election after session is recovered
    
    * Fixed TopicOwnerTest
    
    * Addresses comments
---
 .../pulsar/broker/service/TopicOwnerTest.java      | 214 +--------------------
 .../coordination/impl/LeaderElectionImpl.java      |  86 +++++++--
 .../apache/pulsar/metadata/LeaderElectionTest.java | 103 +++++++++-
 .../org/apache/pulsar/metadata/ZKSessionTest.java  |  55 ++++++
 4 files changed, 227 insertions(+), 231 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 62ec5f4..a7e58ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -48,6 +48,8 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -59,6 +61,7 @@ import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.SessionTracker;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.mockito.stubbing.Answer;
 import org.powermock.reflect.Whitebox;
@@ -163,14 +166,14 @@ public class TopicOwnerTest {
         return leaderAuthorizedBroker;
     }
 
-    private CompletableFuture<Void> watchZookeeperReconnect(ZooKeeper 
zooKeeper) throws Exception {
+    private CompletableFuture<Void> 
watchMetadataStoreReconnect(MetadataStoreExtended store) {
         CompletableFuture<Void> reconnectedFuture = new CompletableFuture<>();
-        zooKeeper.exists("/", (WatchedEvent event) -> {
-            Watcher.Event.KeeperState state = event.getState();
-            if (state == Watcher.Event.KeeperState.SyncConnected) {
+        store.registerSessionListener(event -> {
+            if (event == SessionEvent.Reconnected || event == 
SessionEvent.SessionReestablished) {
                 reconnectedFuture.complete(null);
             }
         });
+
         return reconnectedFuture;
     }
 
@@ -239,103 +242,6 @@ public class TopicOwnerTest {
     }
 
     @Test
-    public void 
testAcquireOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeCreated() 
throws Exception {
-        String topic1 = "persistent://my-tenant/my-ns/topic-1";
-        NamespaceService leaderNamespaceService = 
leaderPulsar.getNamespaceService();
-        NamespaceBundle namespaceBundle = 
leaderNamespaceService.getBundle(TopicName.get(topic1));
-
-        final MutableObject<PulsarService> leaderAuthorizedBroker = 
spyLeaderNamespaceServiceForAuthorizedBroker();
-
-        PulsarService pulsar1 = pulsarServices[1];
-        final ZooKeeper zooKeeper1 = pulsar1.getZkClient();
-
-        final CompletableFuture<Void> reconnectedFuture = 
watchZookeeperReconnect(zooKeeper1);
-
-        String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);
-
-        spyZookeeperToDisconnectBeforePersist(zooKeeper1, request -> {
-            if (request.type != ZooDefs.OpCode.create) {
-                return false;
-            }
-
-            CreateRequest createRequest = new CreateRequest();
-            
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), 
createRequest);
-            return createRequest.getPath().contains(namespaceBundlePath);
-        });
-
-        leaderAuthorizedBroker.setValue(pulsar1);
-
-        try {
-            // Trigger ownership acquiring and zookeeper disconnecting before 
ownership node created.
-            //
-            // Ignore its execution result since whether it is fail or not 
depends on concrete implementation.
-            pulsarAdmins[1].lookups().lookupTopic(topic1);
-        } catch (Exception ex) {
-            // Ignored intentionally.
-        }
-
-        reconnectedFuture.join();
-
-        // We don't known whether previous lookup was successful or not, but 
now all lookups should succeed.
-        Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-
-        pulsar1.getBrokerService().getTopic(topic1, true).join();
-
-        Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-    }
-
-    @Test
-    public void 
testAcquireOwnershipWithZookeeperDisconnectedAfterOwnershipNodeCreated() throws 
Exception {
-        String topic1 = "persistent://my-tenant/my-ns/topic-1";
-        NamespaceService leaderNamespaceService = 
leaderPulsar.getNamespaceService();
-        NamespaceBundle namespaceBundle = 
leaderNamespaceService.getBundle(TopicName.get(topic1));
-
-        final MutableObject<PulsarService> leaderAuthorizedBroker = 
spyLeaderNamespaceServiceForAuthorizedBroker();
-
-        PulsarService pulsar1 = pulsarServices[1];
-        final ZooKeeper zooKeeper1 = pulsar1.getZkClient();
-
-        final CompletableFuture<Void> reconnectedFuture = 
watchZookeeperReconnect(zooKeeper1);
-
-        String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);
-
-        spyZookeeperToDisconnectAfterPersist(zooKeeper1, request -> {
-            if (request.type != ZooDefs.OpCode.create) {
-                return false;
-            }
-
-            CreateRequest createRequest = new CreateRequest();
-            
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), 
createRequest);
-            return createRequest.getPath().contains(namespaceBundlePath);
-        });
-
-        leaderAuthorizedBroker.setValue(pulsar1);
-
-        try {
-            // Trigger ownership acquiring and zookeeper disconnecting after 
ownership node created.
-            //
-            // Ignore its execution result since whether it is fail or not 
depends on concrete implementation.
-            pulsarAdmins[1].lookups().lookupTopic(topic1);
-        } catch (Exception ex) {
-            // Ignored intentionally.
-        }
-
-        reconnectedFuture.join();
-
-        Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-
-        pulsar1.getBrokerService().getTopic(topic1, true).join();
-
-        Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-    }
-
-    @Test
     public void testReestablishOwnershipAfterInvalidateCache() throws 
Exception {
         String topic1 = "persistent://my-tenant/my-ns/topic-1";
         NamespaceService leaderNamespaceService = 
leaderPulsar.getNamespaceService();
@@ -389,112 +295,6 @@ public class TopicOwnerTest {
     }
 
     @Test
-    public void 
testReleaseOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeDeleted() 
throws Exception {
-        String topic1 = "persistent://my-tenant/my-ns/topic-1";
-        NamespaceService leaderNamespaceService = 
leaderPulsar.getNamespaceService();
-        NamespaceBundle namespaceBundle = 
leaderNamespaceService.getBundle(TopicName.get(topic1));
-
-        final MutableObject<PulsarService> leaderAuthorizedBroker = 
spyLeaderNamespaceServiceForAuthorizedBroker();
-
-        PulsarService pulsar1 = pulsarServices[1];
-        PulsarService pulsar2 = pulsarServices[2];
-
-        leaderAuthorizedBroker.setValue(pulsar1);
-        Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-
-        ZooKeeper zooKeeper1 = pulsar1.getZkClient();
-
-        CompletableFuture<Void> reconnectedFuture = 
watchZookeeperReconnect(zooKeeper1);
-
-        String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);
-
-        spyZookeeperToDisconnectBeforePersist(zooKeeper1, request -> {
-            if (request.type != ZooDefs.OpCode.delete) {
-                return false;
-            }
-            DeleteRequest deleteRequest = new DeleteRequest();
-            
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), 
deleteRequest);
-            return deleteRequest.getPath().contains(namespaceBundlePath);
-        });
-
-        try {
-            
pulsarAdmins[1].namespaces().unloadNamespaceBundle(namespaceBundle.getNamespaceObject().toString(),
 namespaceBundle.getBundleRange());
-        } catch (Exception ex) {
-            // Ignored since whether failing unloading when zk connection-loss 
is an implementation detail.
-        }
-
-        reconnectedFuture.join();
-
-        leaderAuthorizedBroker.setValue(pulsar2);
-
-        // We don't known whether previous unload was successful or not, but 
now all lookups should return same result.
-        final String currentBrokerServiceUrl = 
pulsarAdmins[0].lookups().lookupTopic(topic1);
-        Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), 
currentBrokerServiceUrl);
-        Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), 
currentBrokerServiceUrl);
-        Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), 
currentBrokerServiceUrl);
-        Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), 
currentBrokerServiceUrl);
-
-        pulsarAdmins[0].topics().createNonPartitionedTopic(topic1);
-    }
-
-    @Test
-    public void 
testReleaseOwnershipWithZookeeperDisconnectedAfterOwnershipNodeDeleted() throws 
Exception {
-        String topic1 = "persistent://my-tenant/my-ns/topic-1";
-        NamespaceService leaderNamespaceService = 
leaderPulsar.getNamespaceService();
-        NamespaceBundle namespaceBundle = 
leaderNamespaceService.getBundle(TopicName.get(topic1));
-
-        final MutableObject<PulsarService> leaderAuthorizedBroker = 
spyLeaderNamespaceServiceForAuthorizedBroker();
-
-        PulsarService pulsar1 = pulsarServices[1];
-        PulsarService pulsar2 = pulsarServices[2];
-
-        leaderAuthorizedBroker.setValue(pulsar1);
-        Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), 
pulsar1.getBrokerServiceUrl());
-
-        ZooKeeper zooKeeper1 = pulsar1.getZkClient();
-
-        CompletableFuture<Void> reconnectedFuture = 
watchZookeeperReconnect(zooKeeper1);
-
-        String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);
-
-        spyZookeeperToDisconnectAfterPersist(zooKeeper1, request -> {
-            if (request.type != ZooDefs.OpCode.delete) {
-                return false;
-            }
-            DeleteRequest deleteRequest = new DeleteRequest();
-            
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), 
deleteRequest);
-            return deleteRequest.getPath().contains(namespaceBundlePath);
-        });
-
-        try {
-            
pulsarAdmins[1].namespaces().unloadNamespaceBundle(namespaceBundle.getNamespaceObject().toString(),
 namespaceBundle.getBundleRange());
-        } catch (Exception ex) {
-            // Ignored since whether failing unloading when zk connection-loss 
is an implementation detail.
-        }
-
-        reconnectedFuture.join();
-
-        leaderAuthorizedBroker.setValue(pulsar2);
-
-        Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), 
pulsar2.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), 
pulsar2.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), 
pulsar2.getBrokerServiceUrl());
-
-        pulsar2.getBrokerService().getTopic(topic1, true).join();
-
-        Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), 
pulsar2.getBrokerServiceUrl());
-        Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), 
pulsar2.getBrokerServiceUrl());
-    }
-
-    @Test
     public void testConnectToInvalidateBundleCacheBroker() throws Exception {
         
Assert.assertEquals(pulsarAdmins[0].namespaces().getPolicies("my-tenant/my-ns").bundles.getNumBundles(),
 16);
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 708b72b..19b0230 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -34,6 +34,7 @@ import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException;
@@ -44,11 +45,12 @@ import 
org.apache.pulsar.metadata.api.coordination.LeaderElection;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
 import org.apache.pulsar.metadata.cache.impl.MetadataSerde;
 
 @Slf4j
-class LeaderElectionImpl<T> implements LeaderElection<T>, 
Consumer<Notification> {
+class LeaderElectionImpl<T> implements LeaderElection<T> {
     private final String path;
     private final MetadataSerde<T> serde;
     private final MetadataStoreExtended store;
@@ -61,7 +63,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T>, 
Consumer<Notification>
 
     private final ScheduledExecutorService executor;
 
-    private static enum InternalState {
+    private enum InternalState {
         Init, ElectionInProgress, LeaderIsPresent, Closed
     }
 
@@ -80,7 +82,8 @@ class LeaderElectionImpl<T> implements LeaderElection<T>, 
Consumer<Notification>
         this.stateChangesListener = stateChangesListener;
         this.executor = Executors.newScheduledThreadPool(0, new 
DefaultThreadFactory("leader-election-executor"));
 
-        store.registerListener(this);
+        store.registerListener(this::handlePathNotification);
+        store.registerSessionListener(this::handleSessionNotification);
     }
 
     @Override
@@ -96,26 +99,58 @@ class LeaderElectionImpl<T> implements LeaderElection<T>, 
Consumer<Notification>
     private synchronized CompletableFuture<LeaderElectionState> elect() {
         // First check if there's already a leader elected
         internalState = InternalState.ElectionInProgress;
-        return cache.get(path).thenCompose(optLock -> {
+        return store.get(path).thenCompose(optLock -> {
             if (optLock.isPresent()) {
-                synchronized (LeaderElectionImpl.this) {
-                    internalState = InternalState.LeaderIsPresent;
-                    if (leaderElectionState != LeaderElectionState.Following) {
-                        leaderElectionState = LeaderElectionState.Following;
-                        try {
-                            stateChangesListener.accept(leaderElectionState);
-                        } catch (Throwable t) {
-                            log.warn("Exception in state change listener", t);
-                        }
-                    }
-                    return 
CompletableFuture.completedFuture(leaderElectionState);
-                }
+                return handleExistingLeaderValue(optLock.get());
             } else {
                 return tryToBecomeLeader();
             }
         });
     }
 
+    private synchronized CompletableFuture<LeaderElectionState> 
handleExistingLeaderValue(GetResult res) {
+        T existingValue;
+        try {
+            existingValue = serde.deserialize(res.getValue());
+        } catch (Throwable t) {
+            return FutureUtils.exception(t);
+        }
+
+        if (existingValue.equals(proposedValue.orElse(null))) {
+            // If the value is the same as our proposed value, it means this 
instance was the leader at some
+            // point before. The existing value can either be for this same 
session or for a previous one.
+            if (res.getStat().isCreatedBySelf()) {
+                // The value is still valid because it was created in the same 
session
+                changeState(LeaderElectionState.Leading);
+            } else {
+                // Since the value was created in a different session, it 
might be expiring. We need to delete it
+                // and try the election again.
+                return store.delete(path, 
Optional.of(res.getStat().getVersion()))
+                        .thenCompose(__ -> tryToBecomeLeader());
+            }
+        } else if (res.getStat().isCreatedBySelf()) {
+            // The existing value is different but was created from the same 
session
+            return store.delete(path, Optional.of(res.getStat().getVersion()))
+                    .thenCompose(__ -> tryToBecomeLeader());
+        }
+
+        // If the existing value is different, it means there's already 
another leader
+        changeState(LeaderElectionState.Following);
+        return 
CompletableFuture.completedFuture(LeaderElectionState.Following);
+    }
+
+    private synchronized void changeState(LeaderElectionState les) {
+        internalState = InternalState.LeaderIsPresent;
+        if (this.leaderElectionState != les) {
+            this.leaderElectionState = les;
+            try {
+                stateChangesListener.accept(leaderElectionState);
+            } catch (Throwable t) {
+                log.warn("Exception in state change listener", t);
+            }
+        }
+    }
+
     private synchronized CompletableFuture<LeaderElectionState> 
tryToBecomeLeader() {
         byte[] payload;
         try {
@@ -133,7 +168,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T>, 
Consumer<Notification>
                             cache.get(path)
                                     .thenRun(() -> {
                                         synchronized (LeaderElectionImpl.this) 
{
-                                            log.info("Acquired resource lock 
on {}", path);
+                                            log.info("Acquired leadership on 
{}", path);
                                             internalState = 
InternalState.LeaderIsPresent;
                                             if (leaderElectionState != 
LeaderElectionState.Leading) {
                                                 leaderElectionState = 
LeaderElectionState.Leading;
@@ -233,8 +268,19 @@ class LeaderElectionImpl<T> implements LeaderElection<T>, 
Consumer<Notification>
         return cache.getIfCached(path);
     }
 
-    @Override
-    public void accept(Notification notification) {
+    private synchronized void handleSessionNotification(SessionEvent event) {
+        if (event == SessionEvent.SessionReestablished) {
+            if (leaderElectionState == LeaderElectionState.Leading) {
+                log.info("Revalidating leadership for {}", path);
+            }
+
+            elect().thenAccept(les -> {
+                log.info("Resynced leadership for {} - State: {}", path, les);
+            });
+        }
+    }
+
+    private void handlePathNotification(Notification notification) {
         if (!path.equals(notification.getPath())) {
             // Ignore notifications we don't care about
             return;
@@ -249,7 +295,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T>, 
Consumer<Notification>
             if (notification.getType() == NotificationType.Deleted) {
                 if (leaderElectionState == LeaderElectionState.Leading) {
                     // We've lost the leadership, switch to follower mode
-                    log.info("Leader released for {}", path);
+                    log.warn("Leadership released for {}", path);
                 }
 
                 leaderElectionState = LeaderElectionState.NoLeader;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
index 1966be0..fac83ff 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
 
+import java.util.EnumSet;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -27,11 +28,13 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
 import org.apache.pulsar.metadata.api.coordination.LeaderElection;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
 import org.testng.annotations.Test;
@@ -72,11 +75,19 @@ public class LeaderElectionTest extends 
BaseMetadataStoreTest {
 
     @Test(dataProvider = "impl")
     public void multipleMembers(String provider, String url) throws Exception {
+        if (provider.equals("Memory")) {
+            // There are no multiple session in local mem provider
+            return;
+        }
+
         @Cleanup
-        MetadataStoreExtended store = MetadataStoreExtended.create(url, 
MetadataStoreConfig.builder().build());
+        MetadataStoreExtended store1 = MetadataStoreExtended.create(url, 
MetadataStoreConfig.builder().build());
+        @Cleanup
+        MetadataStoreExtended store2 = MetadataStoreExtended.create(url, 
MetadataStoreConfig.builder().build());
+
 
         @Cleanup
-        CoordinationService cs1 = new CoordinationServiceImpl(store);
+        CoordinationService cs1 = new CoordinationServiceImpl(store1);
 
         BlockingQueue<LeaderElectionState> n1 = new LinkedBlockingDeque<>();
 
@@ -87,7 +98,7 @@ public class LeaderElectionTest extends BaseMetadataStoreTest 
{
                 });
 
         @Cleanup
-        CoordinationService cs2 = new CoordinationServiceImpl(store);
+        CoordinationService cs2 = new CoordinationServiceImpl(store2);
 
         BlockingQueue<LeaderElectionState> n2 = new LinkedBlockingDeque<>();
 
@@ -105,8 +116,8 @@ public class LeaderElectionTest extends 
BaseMetadataStoreTest {
 
         LeaderElectionState les2 = le2.elect("test-2").join();
         assertEquals(les2, LeaderElectionState.Following);
-        assertEquals(le2.getLeaderValueIfPresent(), Optional.of("test-1"));
         assertEquals(le2.getLeaderValue().join(), Optional.of("test-1"));
+        assertEquals(le2.getLeaderValueIfPresent(), Optional.of("test-1"));
         assertEquals(n2.poll(3, TimeUnit.SECONDS), 
LeaderElectionState.Following);
 
         le1.close();
@@ -172,4 +183,88 @@ public class LeaderElectionTest extends 
BaseMetadataStoreTest {
         assertEquals(cache.get("/my/leader-election-2").join(), 
Optional.empty());
     }
 
+
+    @Test(dataProvider = "impl")
+    public void revalidateLeaderWithinSameSession(String provider, String url) 
throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(url, 
MetadataStoreConfig.builder().build());
+
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService cs = new CoordinationServiceImpl(store);
+
+        @Cleanup
+        LeaderElection<String> le = cs.getLeaderElection(String.class,
+                path, __ -> {
+                });
+
+        store.put(path, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes("test-1"), 
Optional.of(-1L),
+                EnumSet.of(CreateOption.Ephemeral)).join();
+
+        LeaderElectionState les = le.elect("test-2").join();
+        assertEquals(les, LeaderElectionState.Leading);
+        assertEquals(le.getLeaderValue().join(), Optional.of("test-2"));
+        assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-2"));
+    }
+
+    @Test(dataProvider = "impl")
+    public void revalidateLeaderWithDifferentSessionsSameValue(String 
provider, String url) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(url, 
MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        MetadataStoreExtended store2 = MetadataStoreExtended.create(url, 
MetadataStoreConfig.builder().build());
+
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService cs = new CoordinationServiceImpl(store);
+
+        @Cleanup
+        LeaderElection<String> le = cs.getLeaderElection(String.class,
+                path, __ -> {
+                });
+
+        store2.put(path, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes("test-1"), 
Optional.of(-1L),
+                EnumSet.of(CreateOption.Ephemeral)).join();
+
+        LeaderElectionState les = le.elect("test-1").join();
+        assertEquals(les, LeaderElectionState.Leading);
+        assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+        assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+    }
+
+
+    @Test(dataProvider = "impl")
+    public void revalidateLeaderWithDifferentSessionsDifferentValue(String 
provider, String url) throws Exception {
+        if (provider.equals("Memory")) {
+            // There are no multiple sessions for the local memory provider
+            return;
+        }
+
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(url, 
MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        MetadataStoreExtended store2 = MetadataStoreExtended.create(url, 
MetadataStoreConfig.builder().build());
+
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService cs = new CoordinationServiceImpl(store);
+
+        @Cleanup
+        LeaderElection<String> le = cs.getLeaderElection(String.class,
+                path, __ -> {
+                });
+
+        store2.put(path, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes("test-1"), 
Optional.of(-1L),
+                EnumSet.of(CreateOption.Ephemeral)).join();
+
+        LeaderElectionState les = le.elect("test-2").join();
+        assertEquals(les, LeaderElectionState.Following);
+        assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+        assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+    }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 89bd13a..f565a8f 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -26,11 +26,14 @@ import static org.testng.Assert.assertTrue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import lombok.Cleanup;
 
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.coordination.LockManager;
 import org.apache.pulsar.metadata.api.coordination.ResourceLock;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -133,4 +136,56 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
 
         assertTrue(store.get(path).join().isPresent());
     }
+
+    @Test
+    public void testReacquireLeadershipAfterSessionLost() throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(zks.getConnectionString(),
+                MetadataStoreConfig.builder()
+                        .sessionTimeoutMillis(2_000)
+                        .build());
+
+        BlockingQueue<SessionEvent> sessionEvents = new 
LinkedBlockingQueue<>();
+        store.registerSessionListener(sessionEvents::add);
+
+        BlockingQueue<LeaderElectionState> leaderElectionEvents = new 
LinkedBlockingQueue<>();
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService coordinationService = new 
CoordinationServiceImpl(store);
+        @Cleanup
+        LeaderElection<String> le1 = 
coordinationService.getLeaderElection(String.class, path,
+                leaderElectionEvents::add);
+
+        le1.elect("value-1").join();
+        assertEquals(le1.getState(), LeaderElectionState.Leading);
+
+        LeaderElectionState les = leaderElectionEvents.poll(5, 
TimeUnit.SECONDS);
+        assertEquals(les, LeaderElectionState.Leading);
+
+        zks.expireSession(((ZKMetadataStore) store).getZkSessionId());
+
+        SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.ConnectionLost);
+
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.SessionLost);
+
+        assertEquals(le1.getState(), LeaderElectionState.Leading);
+        les = leaderElectionEvents.poll();
+        assertNull(les);
+
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.Reconnected);
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.SessionReestablished);
+
+        Thread.sleep(2_000);
+
+        assertEquals(le1.getState(), LeaderElectionState.Leading);
+        les = leaderElectionEvents.poll();
+        assertNull(les);
+
+        assertTrue(store.get(path).join().isPresent());
+    }
 }

Reply via email to